Skip to content

Commit

Permalink
zio2 (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
justcoon authored Jun 25, 2022
1 parent c751480 commit b244510
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 71 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ Test / fork := true
run / fork := true

val http4sVersion = "0.23.11"
val zioVersion = "2.0.0-RC6"
val interopVersion = "3.3.0-RC7"
val zioVersion = "2.0.0"
val interopVersion = "3.3.0"
val catsEffectVersion = "3.3.11"
//val zioNIOVersion = "1.0.0-RC11"
val prometheusVersion = "0.12.0"
Expand Down
23 changes: 17 additions & 6 deletions docs/essentials/dropwizard.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ You can run and verify the results so:

```scala mdoc:silent
val name = MetricRegistry.name("DropwizardCounter", Array("test", "counter"): _*)
val r = rt.unsafeRun(testCounter)
val r = Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(testCounter).getOrThrow()
}
val cs = r.getCounters()
val c = if (cs.get(name) == null) 0 else cs.get(name).getCount
```
Expand All @@ -161,7 +163,9 @@ You can run and verify the results so:

```scala mdoc:silent
val gaugeName = MetricRegistry.name("DropwizardGauge", Array("test", "gauge"): _*)
val rGauge = rt.unsafeRun(testGauge)
val rGauge = Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(testGauge).getOrThrow()
}
val gs = rGauge._1.getGauges()
val g = if (gs.get(gaugeName) == null) Long.MaxValue else gs.get(gaugeName).getValue().asInstanceOf[Long]
```
Expand All @@ -176,7 +180,9 @@ results:
_ <- printLine(j.spaces2)
} yield ()

rt.unsafeRun(str)
Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(str).getOrThrow()
}
```

Let's discuss `Reporters` next.
Expand Down Expand Up @@ -222,8 +228,11 @@ Let's combine a couple of them:
} yield r

def run(args: List[String]) = {
val json = rt.unsafeRun(tests >>= (r =>
DropwizardExtractor.writeJson(r)(None))) // JSON Registry Printer
val json = Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(tests >>= (r =>
DropwizardExtractor.writeJson(r)(None))) // JSON Registry Printer
.getOrThrow()
}
RIO.sleep(Duration.fromScala(60.seconds))
printLine(json.spaces2).map(_ => 0)
}
Expand Down Expand Up @@ -331,7 +340,9 @@ As usual, you can verify its data directly:

```scala mdoc:silent
val timerName = MetricRegistry.name("DropwizardTimer", Array("test", "timer"): _*)
val rTimer = rt.unsafeRun(testTimer)
val rTimer = Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(testTimer).getOrThrow()
}
val meanRate = rTimer._1
.getTimers()
.get(timerName)
Expand Down
59 changes: 40 additions & 19 deletions docs/essentials/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ You can run and verify the results so:
val timeSeriesNames = new util.HashSet[String]() {
add("simple_counter_total")
}
val r = rt.unsafeRun(testCounter)
val r = Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(testCounter).getOrThrow()
}
val count = r
.filteredMetricFamilySamples(timeSeriesNames)
.nextElement()
Expand Down Expand Up @@ -218,7 +220,9 @@ And to run and verify the result:
```scala mdoc:silent
val setG: util.Set[String] = new util.HashSet[String]()
setG.add("simple_gauge")
val rG = rt.unsafeRun(testGauge)
val rG = Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(testGauge).getOrThrow()
}
val a1 = rG._1
.filteredMetricFamilySamples(setG)
.nextElement()
Expand Down Expand Up @@ -271,7 +275,9 @@ given registry. Let's look at an example of how all this works.
} yield hs

def main(args: Array[String]): Unit =
rt.unsafeRun(exporterTest >>= (server => printLine(s"Server port: ${server.getPort()}")))
Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(exporterTest >>= (server => printLine(s"Server port: ${server.getPort()}"))).getOrThrow()
}
```

Where `>>=` = `flatMap`. Also note that `exporters` refers to the helper object
Expand Down Expand Up @@ -319,14 +325,18 @@ You can, of course, verify the usual way:
setHT.add("simple_histogram_timer_count")
setHT.add("simple_histogram_timer_sum")

val rht = rt.unsafeRun(testHistogramTimer)
val rht = Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(testHistogramTimer).getOrThrow()
}
val cnt = rht.filteredMetricFamilySamples(setHT).nextElement().samples.get(0).value
val sum = rht.filteredMetricFamilySamples(setHT).nextElement().samples.get(1).value
```

or simpler using our `exporters` helper:
```scala mdoc:silent
val rhtE = rt.unsafeRun(testHistogramTimerHelper)
val rhtE = Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(testHistogramTimerHelper).getOrThrow()
}
write004(rhtE)
```

Expand Down Expand Up @@ -410,15 +420,19 @@ acquisition, use, and release of the timer and the task execution.
Now lets inspect our values by `tap`ping our `RIO`.

```scala mdoc:silent
rt.unsafeRun(testHistogramDuration.tap(r => write004(r).map(println)))
Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(testHistogramDuration.tap(r => write004(r).map(println))).getOrThrow()
}
```

Please note there's no reason to use `tap` here, its just to demonstrate that we
are returning `RIO`s which means we have at our disposal all its combinators. We
might as well just use

```
val rhd = rt.unsafeRun(testHistogramDuration)
val rhd = Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(testHistogramDuration).getOrThrow()
}
write004(rhd)
```

Expand Down Expand Up @@ -513,12 +527,15 @@ simply expose methods to use them in your `MeasuringPoints` through the Layer:

val live: Layer[Nothing, Metrics] = ZLayer.succeed(new Service {

private val (myCounter, myHistogram) = rtLayer.unsafeRun(
for {
c <- counter.register("myCounter", Array("name", "method"))
h <- histogram.register("myHistogram", Array("name", "method"))
} yield (c, h)
)
private val (myCounter, myHistogram) =
Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(
for {
c <- counter.register("myCounter", Array("name", "method"))
h <- histogram.register("myHistogram", Array("name", "method"))
} yield (c, h)
).getOrThrow()
}

def getRegistry(): Task[CollectorRegistry] =
getCurrentRegistry().provideLayer(Registry.live)
Expand Down Expand Up @@ -560,7 +577,9 @@ And then we can use it so:
val programL = exporterTest >>= (server => printLine(s"Server port: ${server.getPort()}"))

def main(args: Array[String]): Unit =
rtLayer.unsafeRun(programL.provideSomeLayer[Env](Metrics.live))
Unsafe.unsafeCompat { implicit u =>
rtLayer.unsafe.run(programL.provideSomeLayer[Env](Metrics.live)).getOrThrow()
}
```

The key of this approach is that even if you register only one counter, you can
Expand All @@ -572,17 +591,17 @@ counters as combination of tags you can devise for your app. Your
use it.

The main drawback, as far as I can tell, is that you need that extra call to
`unsafeRun` in order to extract and use the metrics themselves. If you don't,
`Runtime.unsafe.run` in order to extract and use the metrics themselves. If you don't,
then every call to `inc` or `time` would attemp to re-register the metric which
results in an Exception. This is because things wrapped up in `ZIO`s are
descriptions of a program so flatmapping or folding on them causes them to
start their execution flow from the beginning.

Although the ideal is to call `unsafeRun` only once in your App, this is only an
Although the ideal is to call `Runtime.unsafe.run` only once in your App, this is only an
ideal and calling it a second (or even third) time is OK as long as you do not
abuse its usage.

You can, however, eliminate this extra `unsafeRun` by registering your `counter`
You can, however, eliminate this extra `Runtime.unsafe.run` by registering your `counter`
and `histogram` during startup and pass them as inputs to your `ZLayer` using
`fromFunction`:

Expand All @@ -604,7 +623,7 @@ and `histogram` during startup and pass them as inputs to your `ZLayer` using
```

The second approach is somewhat more generic and doesn't need extra calls to
`unsafeRun` but it requires the use of `PartialFunction`s and keeping a private
`Runtime.unsafe.run` but it requires the use of `PartialFunction`s and keeping a private
`Map` inside our custom Layer, here called `MetricsMap`:

```scala
Expand Down Expand Up @@ -706,5 +725,7 @@ and then usinig them downstream wherever you need:
val programMM = startup *> exporterTest >>= (server => printLine(s"Server port: ${server.getPort()}"))

def main(args: Array[String]): Unit =
rt.unsafeRunMM(programMM)
Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(programMM).getOrThrow()
}
```
40 changes: 23 additions & 17 deletions docs/essentials/statsd.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ We can now run our sample client so:

```scala mdoc:silent
def main(args: Array[String]): Unit = {
rt.unsafeRun(program >>= (lst => printLine(s"Main: $lst")))
Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(program >>= (lst => printLine(s"Main: $lst"))).getOrThrow()
}
}
```

Expand Down Expand Up @@ -336,15 +338,17 @@ We can reuse `rt`, the runtime created earlier to run our `program`:
```scala mdoc:silent
def main1(args: Array[String]): Unit = {
val timeouts = Seq(34L, 76L, 52L)
rt.unsafeRun(
ZIO.scoped {
createStatsDClient.flatMap { statsDClient =>
RIO
.foreach(timeouts)(l => program(l)(statsDClient))
.repeat(schd)
Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(
ZIO.scoped {
createStatsDClient.flatMap { statsDClient =>
RIO
.foreach(timeouts)(l => program(l)(statsDClient))
.repeat(schd)
}
}
}
)
).getOrThrow()
}
Thread.sleep(10000) // wait for all messages to be consumed
}
```
Expand Down Expand Up @@ -380,15 +384,17 @@ create a new runtime to support it.

def main2(args: Array[String]): Unit = {
val timeouts = Seq(34L, 76L, 52L)
rtDog.unsafeRun(
ZIO.scoped {
createDogStatsDClient.flatMap { dogStatsDClient =>
RIO
.foreach(timeouts)(l => dogProgram(l)(dogStatsDClient))
.repeat(schd)
Unsafe.unsafeCompat { implicit u =>
rtDog.unsafe.run(
ZIO.scoped {
createDogStatsDClient.flatMap { dogStatsDClient =>
RIO
.foreach(timeouts)(l => dogProgram(l)(dogStatsDClient))
.repeat(schd)
}
}
}
)
).getOrThrow()
}
Thread.sleep(10000)
}

Expand Down
20 changes: 9 additions & 11 deletions dropwizard/src/main/scala/zio/metrics/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@ object Server {

def builder[Ctx]: KleisliApp => HttpTask[Unit] =
(app: KleisliApp) =>
ZIO
.runtime[HttpEnvironment]
.flatMap { implicit rts =>
BlazeServerBuilder[HttpTask]
.withExecutionContext(rts.executor.asExecutionContext)
.bindHttp(port)
.withHttpApp(app)
.serve
.compile
.drain
}
ZIO.executor.flatMap { executor =>
BlazeServerBuilder[HttpTask]
.withExecutionContext(executor.asExecutionContext)
.bindHttp(port)
.withHttpApp(app)
.serve
.compile
.drain
}

def serveMetrics: MetricRegistry => HttpRoutes[Server.HttpTask] =
registry =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.prometheus.client.{ Counter => PCounter }
import io.prometheus.client.exporter.HTTPServer
import zio.Console
import zio.Console.printLine
import zio.Unsafe

object ExplicitRegistryLayer {

Expand All @@ -24,7 +25,9 @@ object ExplicitRegistryLayer {

val myCustomLayer = ZLayer.succeed(myRegistry) >>> Registry.explicit

val rt = Runtime.unsafeFromLayer(MetricMap.live ++ Exporters.live)
val rt = Unsafe.unsafeCompat { implicit u =>
Runtime.unsafe.fromLayer(MetricMap.live ++ Exporters.live)
}

type MetricMap = MetricMap.Service

Expand Down Expand Up @@ -113,5 +116,7 @@ object ExplicitRegistryLayer {
val program = startup *> exporterTest flatMap (server => Console.printLine(s"Server port: ${server.getPort()}"))

def main(args: Array[String]): Unit =
rt.unsafeRun(program)
Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(program).getOrThrow()
}
}
10 changes: 7 additions & 3 deletions prometheus/src/test/scala/zio/metrics/MetricMapLayer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import zio.metrics.prometheus._
import zio.metrics.prometheus.helpers._
import zio.metrics.prometheus.exporters.Exporters
import io.prometheus.client.exporter.HTTPServer
import zio.{ Layer, ZLayer }
import zio.{ Layer, Unsafe, ZLayer }
import zio.{ IO, RIO, Task, ZIO }
import io.prometheus.client.CollectorRegistry
import zio.Console.printLine

object MetricMapLayer {

val rt = Runtime.unsafeFromLayer(MetricMap.live ++ Registry.live ++ Exporters.live)
val rt = Unsafe.unsafeCompat { implicit u =>
Runtime.unsafe.fromLayer(MetricMap.live ++ Registry.live ++ Exporters.live)
}

type MetricMap = MetricMap.Service

Expand Down Expand Up @@ -101,5 +103,7 @@ object MetricMapLayer {
val program = startup *> exporterTest flatMap (server => printLine(s"Server port: ${server.getPort()}"))

def main(args: Array[String]): Unit =
rt.unsafeRun(program)
Unsafe.unsafeCompat { implicit u =>
rt.unsafe.run(program).getOrThrow()
}
}
Loading

0 comments on commit b244510

Please sign in to comment.