-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Front end rehaul #212
Front end rehaul #212
Changes from 4 commits
5704ca2
b981d3a
8b5c151
75d47b6
d99eefc
bc40298
71bf64a
2095253
c6e4d6d
d3e9874
e8f6b91
035745e
4674617
831def7
6e6026c
532c69b
6578e7d
35b2fbe
c1097e0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -19,6 +19,7 @@ lazy val V = new { | |||||
val scalatest = "3.2.3" | ||||||
val silencer = "1.7.1" | ||||||
val zio = "1.0.3" | ||||||
val `zio-s3` = "latest.integration" | ||||||
val `zio-interop` = "2.2.0.1" | ||||||
val `zio-kafka` = "0.13.0" | ||||||
} | ||||||
|
@@ -78,7 +79,9 @@ lazy val D = new { | |||||
val zio = Seq( | ||||||
"dev.zio" %% "zio-interop-cats" % V.`zio-interop`, | ||||||
"dev.zio" %% "zio-kafka" % V.`zio-kafka`, | ||||||
"dev.zio" %% "zio-streams" % V.zio | ||||||
"dev.zio" %% "zio-s3" % V.`zio-s3`, | ||||||
"dev.zio" %% "zio-streams" % V.zio, | ||||||
"dev.zio" %% "zio-test" % V.zio | ||||||
) | ||||||
} | ||||||
|
||||||
|
@@ -135,15 +138,15 @@ lazy val commonSettings = Seq( | |||||
licenses += "MIT" -> url("http://opensource.org/licenses/MIT"), | ||||||
developers += Developer("sirocchj", "Julien Sirocchi", "[email protected]", url("https://github.com/sirocchj")), | ||||||
scalacOptions ++= versionDependent(scalaVersion.value), | ||||||
resolvers += "confluent" at "https://packages.confluent.io/maven/" | ||||||
resolvers ++= Seq("confluent" at "https://packages.confluent.io/maven/", "snapshots" at "https://oss.sonatype.org/content/repositories/snapshots") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. which dependency needs snapshots ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will remove this, already removed the zio-s3 dep There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This we can probably revert too here |
||||||
) | ||||||
|
||||||
lazy val tamer = project | ||||||
.in(file("core")) | ||||||
.settings(commonSettings) | ||||||
.settings( | ||||||
name := "tamer", | ||||||
libraryDependencies ++= (D.cats ++ D.config ++ D.doobie ++ D.kafka ++ D.logs ++ D.refined ++ D.serialization ++ D.silencer ++ D.tests ++ D.zio) | ||||||
libraryDependencies ++= (D.cats ++ D.config ++ D.kafka ++ D.logs ++ D.refined ++ D.serialization ++ D.silencer ++ D.tests ++ D.zio) | ||||||
.map(_.withSources) | ||||||
.map(_.withJavadoc), | ||||||
libraryDependencies ++= D.avro, | ||||||
|
@@ -152,10 +155,19 @@ lazy val tamer = project | |||||
Test / console / scalacOptions := (Compile / console / scalacOptions).value | ||||||
) | ||||||
|
||||||
lazy val doobie = project | ||||||
.in(file("doobie")) | ||||||
.dependsOn(tamer) | ||||||
.settings(commonSettings) | ||||||
.settings( | ||||||
name := "doobie", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, we probably want to prefix all modules' artifacts with
Suggested change
And likely There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As opposed to just just |
||||||
libraryDependencies ++= D.doobie | ||||||
) | ||||||
|
||||||
lazy val example = project | ||||||
.in(file("example")) | ||||||
.enablePlugins(JavaAppPackaging) | ||||||
.dependsOn(tamer) | ||||||
.dependsOn(tamer, doobie) | ||||||
.settings(commonSettings) | ||||||
.settings( | ||||||
libraryDependencies ++= D.postgres, | ||||||
|
@@ -164,7 +176,7 @@ lazy val example = project | |||||
|
||||||
lazy val root = project | ||||||
.in(file(".")) | ||||||
.aggregate(tamer, example) | ||||||
.aggregate(tamer, example, doobie) | ||||||
.settings(commonSettings) | ||||||
.settings( | ||||||
publish / skip := true, | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,48 +1,12 @@ | ||
package tamer | ||
|
||
import com.sksamuel.avro4s._ | ||
import doobie.util.query.Query0 | ||
import tamer.registry.{Registry, Topic} | ||
import zio.UIO | ||
import zio.kafka.serde.Serializer | ||
|
||
final case class ResultMetadata(queryExecutionTime: Long) | ||
final case class QueryResult[V](metadata: ResultMetadata, results: List[V]) | ||
|
||
final case class Setup[K, V, State]( | ||
keySerializer: Serializer[Registry with Topic, K], | ||
valueSerializer: Serializer[Registry with Topic, V], | ||
stateSerde: ZSerde[Registry with Topic, State], | ||
valueToKey: V => K, | ||
defaultState: State, | ||
buildQuery: State => Query0[V], | ||
stateFoldM: State => QueryResult[V] => UIO[State] | ||
abstract class Setup[-K, -V, S]( | ||
val keySerializer: Serializer[Registry with Topic, K], | ||
val valueSerializer: Serializer[Registry with Topic, V], | ||
val stateSerde: ZSerde[Registry with Topic, S], | ||
val defaultState: S, | ||
val queryHash: Int | ||
gurghet marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
|
||
object Setup { | ||
final def avro[K <: Product: Decoder: Encoder: SchemaFor, V <: Product: Decoder: Encoder: SchemaFor, State <: Product: Decoder: Encoder: SchemaFor]( | ||
defaultState: State | ||
)( | ||
buildQuery: State => Query0[V] | ||
)( | ||
valueToKey: V => K, | ||
stateFoldM: State => QueryResult[V] => UIO[State] | ||
): Setup[K, V, State] = | ||
Setup(Serde[K](isKey = true).serializer, Serde[V]().serializer, Serde[State]().serde, valueToKey, defaultState, buildQuery, stateFoldM) | ||
|
||
final def avroSimple[K <: Product: Decoder: Encoder: SchemaFor, V <: Product: Decoder: Encoder: SchemaFor]( | ||
defaultState: V | ||
)( | ||
buildQuery: V => Query0[V], | ||
valueToKey: V => K | ||
): Setup[K, V, V] = | ||
Setup( | ||
Serde[K](isKey = true).serializer, | ||
Serde[V]().serializer, | ||
Serde[V]().serde, | ||
valueToKey, | ||
defaultState, | ||
buildQuery, | ||
_ => r => UIO(r.results.last) | ||
) | ||
} |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,8 +13,6 @@ import zio.interop.catz._ | |
import scala.concurrent.duration.FiniteDuration | ||
|
||
object Config { | ||
final case class Db(driver: NonEmptyString, uri: UriString, username: NonEmptyString, password: Password) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like there's two refinement types left in |
||
final case class Query(fetchChunkSize: PosInt) | ||
final case class KafkaSink(topic: NonEmptyString) | ||
final case class KafkaState(topic: NonEmptyString, groupId: NonEmptyString, clientId: NonEmptyString) | ||
final case class Kafka( | ||
|
@@ -25,20 +23,10 @@ object Config { | |
sink: KafkaSink, | ||
state: KafkaState | ||
) | ||
final case class Tamer(db: Db, query: Query, kafka: Kafka) | ||
|
||
private[this] implicit final val hostListConfigDecoder: ConfigDecoder[String, HostList] = | ||
ConfigDecoder.identity[String].map(_.split(",").toList.map(_.trim)).mapEither(ConfigDecoder[List[String], HostList].decode) | ||
|
||
private[this] val dbConfigValue = ( | ||
env("DATABASE_DRIVER").as[NonEmptyString], | ||
env("DATABASE_URL").as[UriString], | ||
env("DATABASE_USERNAME").as[NonEmptyString], | ||
env("DATABASE_PASSWORD").as[Password].redacted | ||
).parMapN(Db) | ||
|
||
private[this] val queryConfigValue = env("QUERY_FETCH_CHUNK_SIZE").as[PosInt].map(Query) | ||
|
||
private[this] val kafkaSinkConfigValue = env("KAFKA_SINK_TOPIC").as[NonEmptyString].map(KafkaSink) | ||
private[this] val kafkaStateConfigValue = ( | ||
env("KAFKA_STATE_TOPIC").as[NonEmptyString], | ||
|
@@ -53,17 +41,12 @@ object Config { | |
kafkaSinkConfigValue, | ||
kafkaStateConfigValue | ||
).parMapN(Kafka) | ||
private[this] val tamerConfigValue: ConfigValue[Tamer] = (dbConfigValue, queryConfigValue, kafkaConfigValue).parMapN(Tamer.apply) | ||
|
||
trait Service { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Who's using this ? is it needed ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably hereditary of when this was a module, will remove |
||
val dbConfig: URIO[DbConfig, Db] | ||
val queryConfig: URIO[QueryConfig, Query] | ||
val kafkaConfig: URIO[KafkaConfig, Kafka] | ||
} | ||
|
||
val live: Layer[TamerError, TamerConfig] = ZLayer.fromEffectMany { | ||
tamerConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map { | ||
case Tamer(db, query, kafka) => Has(db) ++ Has(query) ++ Has(kafka) | ||
} | ||
val live: Layer[TamerError, KafkaConfig] = ZLayer.fromEffect { | ||
kafkaConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,12 +12,7 @@ package object config { | |
type UriString = String Refined Uri | ||
type UrlString = String Refined Url | ||
|
||
type DbConfig = Has[Config.Db] | ||
type QueryConfig = Has[Config.Query] | ||
type KafkaConfig = Has[Config.Kafka] | ||
type TamerConfig = DbConfig with QueryConfig with KafkaConfig | ||
|
||
val dbConfig: URIO[DbConfig, Config.Db] = ZIO.access(_.get) | ||
val queryConfig: URIO[QueryConfig, Config.Query] = ZIO.access(_.get) | ||
val kafkaConfig: URIO[KafkaConfig, Config.Kafka] = ZIO.access(_.get) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this used anywhere ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agree with @barambani here too. I think I had added this at the time and there's places where it could be used but... it's not, so we might as well remove it |
||
} |
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,8 +1,6 @@ | ||||||
package tamer | ||||||
package kafka | ||||||
|
||||||
import java.security.MessageDigest | ||||||
|
||||||
import eu.timepit.refined.auto._ | ||||||
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient} | ||||||
import log.effect.LogWriter | ||||||
|
@@ -37,10 +35,6 @@ object Kafka { | |||||
val live: URLayer[KafkaConfig, Kafka] = ZLayer.fromService { cfg => | ||||||
new Service { | ||||||
private[this] val logTask: Task[LogWriter[Task]] = log4sFromName.provide("tamer.kafka") | ||||||
private[this] def stateKeyTask[A](a: A, s: String)(f: A => String): Task[StateKey] = | ||||||
Task(MessageDigest.getInstance("SHA-1")).map { md => | ||||||
StateKey(md.digest(f(a).getBytes).take(7).map(b => f"$b%02x").mkString, s) | ||||||
} | ||||||
|
||||||
override final def runLoop[K, V, State, R](setup: Setup[K, V, State])( | ||||||
f: (State, Queue[(K, V)]) => ZIO[R, TamerError, State] | ||||||
|
@@ -67,7 +61,7 @@ object Kafka { | |||||
def sink(q: Queue[(K, V)], p: Producer.Service[Registry with Topic, K, V], layer: ULayer[Registry with Topic]) = | ||||||
logTask.flatMap { log => | ||||||
q.takeAll.flatMap { | ||||||
case Nil => log.trace("no data to push") *> ZIO.unit | ||||||
case Nil => log.trace("no data to push").unit | ||||||
case kvs => | ||||||
p.produceChunkAsync(mkRecordChunk(kvs)).provideSomeLayer[Blocking](layer).retry(tenTimes).flatten.unit <* | ||||||
log.info(s"pushed ${kvs.size} messages to ${cfg.sink.topic}") | ||||||
|
@@ -83,7 +77,7 @@ object Kafka { | |||||
sp: Producer.Service[Registry with Topic, StateKey, State], | ||||||
layer: ULayer[Registry with Topic] | ||||||
) = | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with not having it everywhere, but here the type annotation might help reading it
Suggested change
|
||||||
ZStream.fromEffect(logTask <*> stateKeyTask(setup.defaultState, cfg.state.groupId)(setup.buildQuery(_).sql)).flatMap { | ||||||
ZStream.fromEffect(logTask <*> UIO(StateKey(setup.queryHash.toHexString, cfg.state.groupId))).flatMap { // TODO: no need for UIO, it's pure | ||||||
case (log, stateKey) => | ||||||
ZStream | ||||||
.fromEffect(subscribe(sc)) | ||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,44 @@ | ||||||||||||
package tamer.db | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've seen this done often, but, doesn't this actually go against the principle of least privilege? |
||||||||||||
|
||||||||||||
import ciris.{ConfigException, env} | ||||||||||||
import ciris.refined.refTypeConfigDecoder | ||||||||||||
import cats.implicits._ | ||||||||||||
import eu.timepit.refined.types.numeric.PosInt | ||||||||||||
import eu.timepit.refined.types.string.NonEmptyString | ||||||||||||
import tamer.TamerError | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
import tamer.config.{Password, UriString} | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these would be part of the package object
Suggested change
|
||||||||||||
import zio.interop.catz.{taskConcurrentInstance, zioContextShift} | ||||||||||||
import zio.{Has, Layer, Task, URIO, ZIO, ZLayer} | ||||||||||||
|
||||||||||||
object ConfigDb { | ||||||||||||
type DbConfig = Has[Db] | ||||||||||||
type QueryConfig = Has[Query] | ||||||||||||
|
||||||||||||
val dbConfig: URIO[DbConfig, Db] = ZIO.service | ||||||||||||
val queryConfig: URIO[QueryConfig, Query] = ZIO.service | ||||||||||||
|
||||||||||||
final case class Db(driver: NonEmptyString, uri: UriString, username: NonEmptyString, password: Password) | ||||||||||||
final case class Query(fetchChunkSize: PosInt) | ||||||||||||
final case class DatabaseConfig(db: Db, query: Query) | ||||||||||||
|
||||||||||||
private[this] val dbConfigValue = ( | ||||||||||||
env("DATABASE_DRIVER").as[NonEmptyString], | ||||||||||||
env("DATABASE_URL").as[UriString], | ||||||||||||
env("DATABASE_USERNAME").as[NonEmptyString], | ||||||||||||
env("DATABASE_PASSWORD").as[Password].redacted | ||||||||||||
).parMapN(Db) | ||||||||||||
private[this] val queryConfigValue = env("QUERY_FETCH_CHUNK_SIZE").as[PosInt].map(Query) | ||||||||||||
|
||||||||||||
private[this] val deleteMeConfigValue = (dbConfigValue, queryConfigValue).parMapN(DatabaseConfig.apply) | ||||||||||||
|
||||||||||||
trait Service { | ||||||||||||
val dbConfig: URIO[DbConfig, Db] | ||||||||||||
val queryConfig: URIO[QueryConfig, Query] | ||||||||||||
} | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can all go, right?
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically yes, but maybe it's not the best form, anyways we can refactor later to decide who deserves to be a service and who doesn't |
||||||||||||
|
||||||||||||
val live: Layer[TamerError, Has[Db] with Has[Query]] = ZLayer.fromEffectMany { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
deleteMeConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was to be deleted? Maybe just folding the two things into one config will do too There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yeah, I eventually renamed it but the IDE refactoring algo messed up here. I'll rename it for now. The |
||||||||||||
case DatabaseConfig(db, query) => Has(db) ++ Has(query) | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coming soon? 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/zio/zio-s3/releases/tag/v0.2.5