Skip to content

Commit

Permalink
Add Permit type like Semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
geny200 committed Sep 6, 2024
1 parent 76fdd21 commit 4ced943
Show file tree
Hide file tree
Showing 12 changed files with 258 additions and 18 deletions.
14 changes: 10 additions & 4 deletions modules/core/ce2/src/main/scala/tofu/interop/CE2Agents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package tofu.interop
import cats.{Functor, Monad}
import cats.effect.concurrent.{Ref, Semaphore}
import tofu.Fire
import tofu.concurrent.{Agent, SerialAgent}
import cats.syntax.all._
import tofu.concurrent.{Agent, Permit, SerialAgent}
import cats.syntax.all.*
import tofu.lift.Lift
import tofu.syntax.fire._
import tofu.syntax.liftKernel._
import tofu.syntax.fire.*
import tofu.syntax.liftKernel.*

/** Default implementation of [[tofu.concurrent.Agent]] that consists of [[cats.effect.concurrent.Ref]] and
* [[cats.effect.concurrent.Semaphore]]
Expand Down Expand Up @@ -38,6 +38,12 @@ final case class SerialSemRef[F[_]: Monad, A](ref: Ref[F, A], sem: Semaphore[F])
modifyM(a => if (f.isDefinedAt(a)) f(a) else (default, a).pure[F])
}

/** Default implementation of [[tofu.concurrent.Permit]] that use [[cats.effect.concurrent.Semaphore]]
*/
final case class PermitSem[F[_]](sem: Semaphore[F]) extends Permit[F] {
def withPermit[A](fa: F[A]): F[A] = sem.withPermit(fa)
}

/** If instances of [[cats.effect.concurrent.Ref]] and [[cats.effect.concurrent.Semaphore]] can not be created for some
* `G[_]`, but can be created for some `F[_]`, for which an instance of [[tofu.lift.Lift]] `Lift[F, G]` is present,
* this implementation can be used
Expand Down
10 changes: 10 additions & 0 deletions modules/core/ce2/src/main/scala/tofu/interop/CE2Kernel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@ object CE2Kernel {
} yield UnderlyingSemRef[F, G, A](ref, sem)
}

final def permitBySemaphore[I[_]: Monad, F[_]](implicit
makeSemaphore: MakeSemaphore[I, F]
): MkPermitCE2Carrier[I, F] =
new MkPermitCE2Carrier[I, F] {
override def permitOf(limit: Long): I[Permit[F]] =
makeSemaphore
.semaphore(limit)
.map(PermitSem(_))
}

def boundedParallel[F[_]: Async: Parallel]: BoundedParallelCarrierCE2[F] = new BoundedParallelCarrierCE2.Impl[F] {
def parTraverse[T[_]: Traverse, A, B](in: T[A])(f: A => F[B]): F[T[B]] =
Parallel.parTraverse(in)(f)
Expand Down
47 changes: 47 additions & 0 deletions modules/core/ce2/src/test/scala/tofu/concurrent/PermitSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package tofu.concurrent

import cats.data.ReaderT
import cats.effect.concurrent.Deferred
import cats.effect.syntax.concurrent.*
import cats.effect.{Concurrent, ContextShift, IO, Sync}
import cats.syntax.applicativeError.*
import org.scalatest.funsuite.AnyFunSuite
import tofu.compat.unused
import tofu.syntax.monadic.*

import scala.concurrent.ExecutionContext

class PermitSuite extends AnyFunSuite {

private implicit val ioCS: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

@unused
private def summonInstance[I[_]: Sync, F[_]: Concurrent]: MakePermit[I, F] =
implicitly[MakePermit[I, F]]

test("check IO has Permit") {
assert(
MakePermit[IO, ReaderT[IO, Unit, _]]
.of(2)
.flatMap(permitProg(_).run(()))
.unsafeRunSync() === Left(())
)
}

private def permitProg[F[_]: Concurrent](permit: Permit[F]): F[Either[Unit, Unit]] =
for {
wait1 <- Deferred[F, Unit]
wait2 <- Deferred[F, Unit]
waitEnd <- Deferred[F, Unit]
fiber1 <- permit.withPermit(wait1.complete(()) >> waitEnd.get).start
fiber2 <- permit.withPermit(wait2.complete(()) >> waitEnd.get).start
_ <- wait1.get
_ <- wait2.get
// fiber3 will be blocked, and throw an IllegalStateException when fiber1 or fiber2 will complete
fiber3 <- permit.withPermit(waitEnd.complete(()).attempt).start
resultFiber <- (fiber1.join >> fiber2.join >> fiber3.join).start
_ <- waitEnd.complete(())
result <- resultFiber.join
} yield result.left.map(_ => ())

}
14 changes: 10 additions & 4 deletions modules/core/ce3/src/main/scala/tofu/interop/CE3Agents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package tofu.interop
import cats.effect.Ref
import cats.effect.kernel.MonadCancelThrow
import cats.effect.std.Semaphore
import cats.syntax.all.*
import cats.{Functor, Monad}
import tofu.Fire
import tofu.concurrent.{Agent, SerialAgent}
import cats.syntax.all._
import tofu.concurrent.{Agent, Permit, SerialAgent}
import tofu.lift.Lift
import tofu.syntax.fire._
import tofu.syntax.liftKernel._
import tofu.syntax.fire.*
import tofu.syntax.liftKernel.*

/** Default implementation of [[tofu.concurrent.Agent]] that consists of [[cats.effect.Ref]] and
* [[cats.effect.std.Semaphore]]
Expand Down Expand Up @@ -40,6 +40,12 @@ final case class SerialSemRef[F[_]: MonadCancelThrow, A](ref: Ref[F, A], sem: Se
modifyM(a => if (f.isDefinedAt(a)) f(a) else (default, a).pure[F])
}

/** Default implementation of [[tofu.concurrent.Permit]] that use [[cats.effect.std.Semaphore]]
*/
final case class PermitSem[F[_]: MonadCancelThrow](sem: Semaphore[F]) extends Permit[F] {
def withPermit[A](fa: F[A]): F[A] = sem.permit.use(_ => fa)
}

/** If instances of [[cats.effect.Ref]] and [[cats.effect.std.Semaphore]] can not be created for some `G[_]`, but can be
* created for some `F[_]`, for which an instance of [[tofu.lift.Lift]] `Lift[F, G]` is present, this implementation
* can be used
Expand Down
27 changes: 18 additions & 9 deletions modules/core/ce3/src/main/scala/tofu/interop/CE3Kernel.scala
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
package tofu.interop

import java.util.concurrent.TimeUnit

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}

import cats.effect.kernel._
import cats.effect.kernel.*
import cats.effect.std.Dispatcher
import cats.effect.unsafe.IORuntime
import cats.effect.{Async, Fiber, IO, Sync}
import cats.{Functor, Monad, Parallel, Traverse}
import tofu.compat.unused
import tofu.concurrent._
import tofu.concurrent.*
import tofu.concurrent.impl.QVarSM
import tofu.internal.NonTofu
import tofu.internal.carriers._
import tofu.internal.carriers.*
import tofu.lift.Lift
import tofu.syntax.monadic._
import tofu.syntax.monadic.*
import tofu.{Fire, Scoped, WithContext}

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}

object CE3Kernel {
def delayViaSync[K[_]](implicit KS: Sync[K]): DelayCarrier3[K] =
new DelayCarrier3[K] {
Expand Down Expand Up @@ -165,6 +164,16 @@ object CE3Kernel {
} yield UnderlyingSemRef[F, G, A](ref, sem)
}

final def permitBySemaphore[I[_]: Monad, F[_]: MonadCancelThrow](implicit
makeSemaphore: MakeSemaphore[I, F]
): MkPermitCE3Carrier[I, F] =
new MkPermitCE3Carrier[I, F] {
override def permitOf(limit: Long): I[Permit[F]] =
makeSemaphore
.semaphore(limit)
.map(new PermitSem[F](_))
}

def boundedParallel[F[_]: Concurrent: Parallel]: BoundedParallelCarrierCE3[F] =
new BoundedParallelCarrierCE3.Impl[F] {
def parTraverse[T[_]: Traverse, A, B](in: T[A])(f: A => F[B]): F[T[B]] =
Expand Down
44 changes: 44 additions & 0 deletions modules/core/ce3/src/test/scala/tofu/concurrent/PermitSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package tofu.concurrent

import cats.data.ReaderT
import cats.effect.kernel.Deferred
import cats.effect.syntax.spawn.*
import cats.effect.unsafe.IORuntime
import cats.effect.{Async, Concurrent, IO, Sync}
import org.scalatest.funsuite.AnyFunSuite
import tofu.compat.unused
import tofu.syntax.monadic.*

class PermitSuite extends AnyFunSuite {
private implicit val iort: IORuntime = IORuntime.global

@unused
private def summonInstance[I[_]: Sync, F[_]: Async]: MakePermit[I, F] =
implicitly[MakePermit[I, F]]

test("check IO has Atom") {
assert(
MakePermit[IO, ReaderT[IO, Unit, _]]
.of(2)
.flatMap(permitProg(_).run(()))
.unsafeRunSync() === false
)
}

private def permitProg[F[_]: Concurrent](permit: Permit[F]): F[Boolean] =
for {
wait1 <- Deferred[F, Unit]
wait2 <- Deferred[F, Unit]
waitEnd <- Deferred[F, Unit]
fiber1 <- permit.withPermit(wait1.complete(()) >> waitEnd.get).start
fiber2 <- permit.withPermit(wait2.complete(()) >> waitEnd.get).start
_ <- wait1.get
_ <- wait2.get
// fiber3 will be blocked, and return false (on complete) when fiber1 or fiber2 will complete
fiber3 <- permit.withPermit(waitEnd.complete(())).start
resultFiber <- (fiber1.join >> fiber2.join >> fiber3.join).flatMap(_.embedNever).start
_ <- waitEnd.complete(())
result <- resultFiber.join.flatMap(_.embedNever)
} yield result

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,19 @@ trait MkSerialAgentCE3Carrier0Macro {
F
], I, F, { val `tofu.interop.CE3Kernel.underlyingSerialAgentByRefAndSemaphore`: Unit }]
}

trait MkPermitCE2CarrierMacro {
final implicit def interopCE2Carrier[I[_], F[_]]: MkPermitCE2Carrier[I, F] =
macro Interop.delegate2[MkPermitCE2Carrier[
I,
F
], I, F, { val `tofu.interop.CE2Kernel.permitBySemaphore`: Unit }]
}

trait MkPermitCE3CarrierMacro {
final implicit def interopCE3Carrier[I[_], F[_]]: MkPermitCE3Carrier[I, F] =
macro Interop.delegate2[MkPermitCE3Carrier[
I,
F
], I, F, { val `tofu.interop.CE3Kernel.permitBySemaphore`: Unit }]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package tofu.internal.instances

import tofu.concurrent.MakePermit
import tofu.internal.carriers.{MkPermitCE2Carrier, MkPermitCE3Carrier}

private[tofu] trait MakePermitInstance extends MakePermitInstance0 {
final implicit def interopCE3[I[_], F[_]](implicit carrier: MkPermitCE3Carrier[I, F]): MakePermit[I, F] =
carrier
}

private[tofu] trait MakePermitInstance0 {
final implicit def interopCE2[I[_], F[_]](implicit carrier: MkPermitCE2Carrier[I, F]): MakePermit[I, F] =
carrier
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,13 @@ trait MkSerialAgentCE3Carrier0Macro:
Interop.delegate2[I, F, MkSerialAgentCE3Carrier[I, F]](
"tofu.interop.CE3Kernel.underlyingSerialAgentByRefAndSemaphore"
)

trait MkPermitCE2CarrierMacro:
inline given interopCE2Carrier[I[_], F[_]]: MkPermitCE2Carrier[I, F] =
Interop.delegate2[I, F, MkPermitCE2Carrier[I, F]]("tofu.interop.CE2Kernel.permitBySemaphore")

trait MkPermitCE3CarrierMacro:
inline given underlyinginteropCE3Carrier[I[_], F[_]]: MkPermitCE3Carrier[I, F] =
Interop.delegate2[I, F, MkPermitCE3Carrier[I, F]](
"tofu.interop.CE3Kernel.permitBySemaphore"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package tofu.internal
package instances

import scala.compiletime.summonFrom
import tofu.concurrent.MakePermit
import tofu.internal.carriers.{MkPermitCE2Carrier, MkPermitCE3Carrier}

private[tofu] trait MakePermitInstance:
inline given [I[_], F[_]]: MakePermit[I, F] = summonFrom {
case carrier: MkPermitCE2Carrier[I, F] => carrier
case carrier: MkPermitCE3Carrier[I, F] => carrier
}
58 changes: 58 additions & 0 deletions modules/kernel/src/main/scala/tofu/concurrent/Permit.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package tofu.concurrent

import tofu.internal.instances.MakePermitInstance

/** A purely functional semaphore. A semaphore has a non-negative number of permits available. Acquiring a permit
* decrements the current number of permits and releasing a permit increases the current number of permits. An acquire
* that occurs when there are no permits available results in semantic blocking until a permit becomes available.
* Blocking `withPermit` are cancelable.
*/
trait Permit[F[_]] {

/** Returns an effect that acquires a permit, runs the supplied effect, and then releases the permit. The returned
* effect semantically blocks until permit are available. Note that acquires are statisfied in strict FIFO order.
*/
def withPermit[A](fa: F[A]): F[A]

}

object Permit {
type Make[F[_]] = MakePermit[F, F]

/** A helper for creating instances of [[tofu.concurrent.Permit]] that use the same effect during construction and
* work. If you want to use different effect to construct `Permit` use [[tofu.concurrent.MakePermit]]
*/
def Make[F[_]](implicit makePermit: Make[F]): MakePermit.PermitApplier[F, F] =
new MakePermit.PermitApplier[F, F](makePermit)
}

/** A creator of [[tofu.concurrent.Permit]] that supports effectful construction
* @tparam I
* effect for creation of agent
* @tparam F
* effect on which agent will run
*/
trait MakePermit[I[_], F[_]] {

/** Creates instance of [[tofu.concurrent.Permit]], initialized with `limit` available permits
*
* @param limit
* maximum concurrent permits
* @return
* `I[ Permit[F] ]`
*/
def permitOf(limit: Long): I[Permit[F]]
}

/** A helper for creating instances of [[tofu.concurrent.Permit]] that use different effects during construction and
* work. If you want to use same effect to construct and run `Permit` use [[tofu.concurrent.Permit.Make]]
*/
object MakePermit extends MakePermitInstance {

def apply[I[_], F[_]](implicit mkPermit: MakePermit[I, F]): PermitApplier[I, F] =
new PermitApplier[I, F](mkPermit)

final class PermitApplier[I[_], F[_]](private val mkPermit: MakePermit[I, F]) extends AnyVal {
def of(limit: Long): I[Permit[F]] = mkPermit.permitOf(limit)
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package tofu.internal.carriers

import tofu.concurrent.{MakeAgent, MakeAtom, MakeQVar, MakeSerialAgent}
import tofu.concurrent.{MakeAgent, MakeAtom, MakePermit, MakeQVar, MakeSerialAgent}

trait MkAtomCE2Carrier[I[_], F[_]] extends MakeAtom[I, F]

Expand Down Expand Up @@ -33,3 +33,11 @@ object MkSerialAgentCE2Carrier extends MkSerialAgentCE2CarrierMacro
trait MkSerialAgentCE3Carrier[I[_], F[_]] extends MakeSerialAgent[I, F]

object MkSerialAgentCE3Carrier extends MkSerialAgentCE3CarrierMacro

trait MkPermitCE2Carrier[I[_], F[_]] extends MakePermit[I, F]

object MkPermitCE2Carrier extends MkPermitCE2CarrierMacro

trait MkPermitCE3Carrier[I[_], F[_]] extends MakePermit[I, F]

object MkPermitCE3Carrier extends MkPermitCE3CarrierMacro

0 comments on commit 4ced943

Please sign in to comment.