-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Front end rehaul #212
Front end rehaul #212
Conversation
log <- logTask | ||
_ <- log.info("Starting tamer...") | ||
boot <- UIO(Instant.now()) | ||
setup = tamer.db.mkSetup(ts => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more general setup is currently missing, this particular setup is constrained to use Datable
types and leverages the assumption that we are working with time series.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great step forward, added a few comments on the parts I could spot on mobile and/or I'm less convinced of
build.sbt
Outdated
@@ -19,6 +19,7 @@ lazy val V = new { | |||
val scalatest = "3.2.3" | |||
val silencer = "1.7.1" | |||
val zio = "1.0.3" | |||
val `zio-s3` = "latest.integration" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coming soon? 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val `zio-s3` = "latest.integration" | |
val `zio-s3` = "0.2.5" |
if (queryResult.results.isEmpty) timeSegment.to.plus(tumblingStep).orNow.map(TimeSegment(timeSegment.from, _)) | ||
else { | ||
val mostRecent = queryResult.results.max.instant | ||
mostRecent.plus(tumblingStep).orNow.map(TimeSegment(mostRecent, _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid that doing it here would prevent clients from doing things like: well, once I'm 2/3 seconds wall clock time from current time (yes, they are subject to skew so it's a big assumption) I'm going to back off and run queries slower (see #44 (comment))
I'm sure we could work around this but in general removing client control from what State
makes sense to them and constrain it to be a TimeSegment
might be counter productive (e.g. a client that always needs to pull the entire table or one that needs to scan through a sortable primary key). Alternatively, we provide a factory for multiple common cases (this would be one) and allow one that is much like the original (leaving the movement of the cursor completely to the caller)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is the first part of the second scenario. I wanted to work from a specific practical case to establish a baseline. I’m going to build the more generic setup next and it will be feature parity. The reason is to optimize for (what I think will be) the common case, and then give the user freedom to write his own function if he likes.
val max = values.map(_.modifiedAt).max // if we can't order in the query we need to do it here... | ||
max.plus5Minutes.orNow.map(State(max, _)) | ||
} | ||
object Main extends zio.App { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels like we could still be composing this more minimally here in the example. There should be a way to provide all the below in the core module, bar the db parts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I didn't spend much time on this and I'll leave tidying up the layers last or even in another PR. I expect it to be rather mechanical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also will inline keyExtract
and logTask
to make it less crowded
Co-authored-by: Julien Jean Paul Sirocchi <[email protected]>
This should have feature parity with the old front-end. Is there anything missing? If nothing missing I’ll do a general clean-up and update the README |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the caveat that the best parts of both Main
and MainGeneralized
could (and likely should) go back to core
(so all Tamer apps are initialised the same way, all log the same way, etc and in general there's less boilerplate for the end user to have to write), this is a huge 👍
We can later decide whether using a typeclass-like approach for the hashable state is better/cleaner
} | ||
|
||
val live: Layer[TamerError, Has[Db] with Has[Query]] = ZLayer.fromEffectMany { | ||
deleteMeConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was to be deleted? Maybe just folding the two things into one config will do too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, I eventually renamed it but the IDE refactoring algo messed up here. I'll rename it for now. The db
module is on it's own now so it can be refactored freely in another PR.
As a first iteration I think this is ready to be merged. The only thing remaining, if you don't find anything else is to double check the name of the artifacts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few more comments. @barambani wdyt?
build.sbt
Outdated
@@ -135,15 +136,15 @@ lazy val commonSettings = Seq( | |||
licenses += "MIT" -> url("http://opensource.org/licenses/MIT"), | |||
developers += Developer("sirocchj", "Julien Sirocchi", "[email protected]", url("https://github.com/sirocchj")), | |||
scalacOptions ++= versionDependent(scalaVersion.value), | |||
resolvers += "confluent" at "https://packages.confluent.io/maven/" | |||
resolvers ++= Seq("confluent" at "https://packages.confluent.io/maven/", "snapshots" at "https://oss.sonatype.org/content/repositories/snapshots") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This we can probably revert too here
import java.time.Instant | ||
import scala.util.hashing.byteswap64 | ||
|
||
object Db { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need these classes wrapped in this Db
object for some reason?
Co-authored-by: Julien Jean Paul Sirocchi <[email protected]>
build.sbt
Outdated
@@ -19,6 +19,7 @@ lazy val V = new { | |||
val scalatest = "3.2.3" | |||
val silencer = "1.7.1" | |||
val zio = "1.0.3" | |||
val `zio-s3` = "latest.integration" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val `zio-s3` = "latest.integration" | |
val `zio-s3` = "0.2.5" |
build.sbt
Outdated
@@ -135,15 +138,15 @@ lazy val commonSettings = Seq( | |||
licenses += "MIT" -> url("http://opensource.org/licenses/MIT"), | |||
developers += Developer("sirocchj", "Julien Sirocchi", "[email protected]", url("https://github.com/sirocchj")), | |||
scalacOptions ++= versionDependent(scalaVersion.value), | |||
resolvers += "confluent" at "https://packages.confluent.io/maven/" | |||
resolvers ++= Seq("confluent" at "https://packages.confluent.io/maven/", "snapshots" at "https://oss.sonatype.org/content/repositories/snapshots") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which dependency needs snapshots ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove this, already removed the zio-s3 dep
ZStream | ||
.fromEffect(subscribe(sc)) | ||
.flatMap { | ||
case true => ZStream.fromEffect(log.info(s"consumer group ${cfg.state.groupId} resuming consumption from ${cfg.state.topic}")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems about time to support Zio Streams better in log-effect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that sounds like interesting work
@@ -20,7 +18,7 @@ import zio.kafka.consumer.{CommittableRecord, Consumer, ConsumerSettings, Subscr | |||
import zio.kafka.producer.{Producer, ProducerSettings} | |||
import zio.stream.ZStream | |||
|
|||
final case class StateKey(queryHash: String, groupId: String) | |||
final case class StateKey(stateKey: String, groupId: String) | |||
|
|||
object Kafka { | |||
trait Service { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trait Service { | |
sealed trait Service { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do, should work fine, but I don't think it's idiomatic
@@ -53,17 +41,12 @@ object Config { | |||
kafkaSinkConfigValue, | |||
kafkaStateConfigValue | |||
).parMapN(Kafka) | |||
private[this] val tamerConfigValue: ConfigValue[Tamer] = (dbConfigValue, queryConfigValue, kafkaConfigValue).parMapN(Tamer.apply) | |||
|
|||
trait Service { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who's using this ? is it needed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably hereditary of when this was a module, will remove
val kafkaLayer: Layer[TamerError, Kafka] = Config.live >>> Kafka.live | ||
val transactorLayer: Layer[TamerError, DbTransactor] = (Blocking.live ++ ConfigDb.live) >>> db.hikariLayer | ||
val queryConfigLayer: Layer[TamerError, DbConfig with QueryConfig] = ConfigDb.live | ||
val defaultLayer: Layer[TamerError, DbTransactor with Kafka with QueryConfig] = transactorLayer ++ queryConfigLayer ++ kafkaLayer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these don't seem to depend on setup
or any of the type parameters so they could live outside of fetch
type DbTransactor = Has[Transactor[Task]] | ||
type TamerDBConfig = DbTransactor with QueryConfig | ||
|
||
implicit final class InstantOps(private val instant: Instant) extends AnyVal { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't look specific of the db flavour, it could be in core as an helper syntax.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it's only used there and in relation to db examples. I would leave it here since it's an hapax legomenon.
mkSetup(queryBuilder)(timeSegment, keyExtract, stateFold) | ||
} | ||
|
||
final def mkSetup[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering we have a Setup
case class, couldn't this be its apply ?
object Setup {
final def apply[
K <: Product: Encoder: Decoder: SchemaFor,
V <: Product: Encoder: Decoder: SchemaFor,
S <: Product with HashableState: Encoder: Decoder: SchemaFor
](
queryBuilder: S => Query0[V]
)(defaultState: S, keyExtract: V => K, stateFoldM: S => QueryResult[V] => UIO[S]): Setup[K, V, S] = [...]
also, mkSetupWithTimeSegment
could be a smart constructor in the Setup
companion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, that would make ti more regular, will do.
|
||
private[this] val logTask: Task[LogWriter[Task]] = log4sFromName.provide("tamer.db") | ||
|
||
final def iteration[K <: Product, V <: Product, S <: Product with HashableState]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is iteration
expected to be part of the public api ? Considering we have fetch
and fetchWithTimeSegment
it could be private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, let's make it private for now
} yield managedTransactor | ||
} | ||
|
||
def mkTransactor(db: ConfigDb.Db, connectEC: ExecutionContext, transactEC: ExecutionContext): Managed[TamerError, HikariTransactor[Task]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to expose mkTransactor
as part of the public api ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m assuming not
This is good progress. It will be great to merge it in. |
build.sbt
Outdated
.dependsOn(tamer) | ||
.settings(commonSettings) | ||
.settings( | ||
name := "doobie", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we probably want to prefix all modules' artifacts with tamer-
now that we no longer have only one. This means
name := "doobie", | |
name := "tamer-doobie", |
And likely tamer-core
the other one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As opposed to just just tamer
, because the module in itself doesn't do much without a supporting module to fetch data. Done.
Ok, should be good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last pass then we'll be good to go
local/runDb.sh
Outdated
SCRIPT_PATH=$(cd "$(dirname "${BASH_SOURCE[0]}")" || exit; pwd -P) | ||
cd "$SCRIPT_PATH"/.. || exit | ||
|
||
sbt "example/runMain tamer.example.Main" -jvm-debug 5005 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sbt "example/runMain tamer.example.Main" -jvm-debug 5005 | |
sbt "example/runMain tamer.example.DatabaseSimple" -jvm-debug 5005 |
README.md
Outdated
|
||
### Database module | ||
|
||
Basic manual testing is available for the code in the example module `tamer.example.Main`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basic manual testing is available for the code in the example module `tamer.example.Main`. | |
Basic manual testing is available for the code in the example module `tamer.example.DatabaseSimple` (and/or `tamer.example.DatabaseGeneralized`). |
README.md
Outdated
|
||
From the `local` folder launch `docker-compose up` (you can enter `docker-compose down` | ||
if you want to start from scratch). After that you should be able to access the kafka | ||
gui from http://localhost:8000. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gui from http://localhost:8000. | |
gui from [http://localhost:8000](http://localhost:8000). |
val defaultState: S, | ||
val stateKey: Int | ||
) { | ||
def show: String = "not available, please implement the show method to display setup" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @barambani here, can we remove the default message and leave def show: String
abstract or is there a reason we're missing?
@@ -13,8 +13,6 @@ import zio.interop.catz._ | |||
import scala.concurrent.duration.FiniteDuration | |||
|
|||
object Config { | |||
final case class Db(driver: NonEmptyString, uri: UriString, username: NonEmptyString, password: Password) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like there's two refinement types left in config
's package object that should be moved to the doobie module in the db
package object: UriString
and Password
val queryConfig: URIO[QueryConfig, Query] | ||
} | ||
|
||
val live: Layer[TamerError, Has[Db] with Has[Query]] = ZLayer.fromEffectMany { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val live: Layer[TamerError, Has[Db] with Has[Query]] = ZLayer.fromEffectMany { | |
val live: Layer[TamerError, DbConfig with QueryConfig] = ZLayer.fromEffectMany { |
|
||
trait Service { | ||
val dbConfig: URIO[DbConfig, Db] | ||
val queryConfig: URIO[QueryConfig, Query] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can all go, right?
trait Service { | |
val dbConfig: URIO[DbConfig, Db] | |
val queryConfig: URIO[QueryConfig, Query] | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically yes, but maybe it's not the best form, anyways we can refactor later to decide who deserves to be a service and who doesn't
@@ -0,0 +1,84 @@ | |||
package tamer.db |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package tamer.db | |
package tamer | |
package db |
|
||
import com.sksamuel.avro4s.{Decoder, Encoder, SchemaFor} | ||
import doobie.Query0 | ||
import tamer.Serde |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import tamer.Serde |
override val defaultState: S, | ||
keyExtract: V => K, | ||
stateFoldM: S => QueryResult[V] => UIO[S] | ||
) extends tamer.Setup[K, V, S]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this may be required
) extends tamer.Setup[K, V, S]( | |
) extends _root_.tamer.Setup[K, V, S]( |
For some reason I can't answer to #212 (comment) but the reason is that it's an advanced feature and new implementors might want to defer it instead of simply implementing it as an empty string. So it's like I’m giving them a placeholder for free. Otherwise there are two possibilities:
|
ba7ec60
to
c1097e0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! And thank you! Hopefully this will be the basis for a good few source connectors!
in this spirit, does |
anyway the above is no reason to hold the pr. That's good to go. Brilliant work. |
What do you have in mind? It certainly can't be concrete now that we separated it from the db module. |
yes, something like a case class that can be used as it is for a default setup, and specialised in case the print (or future behaviours) need to be more specific. UPDATE: anyway as said I'll wait when the public api is more mature and there are other examples of user code. I still don't see what make life easier or more complicated. |
This separates database fetching in a separate module.
It adds a local folder for quick testing.