From 4ced943a5ff375cba49ce6a1b08f1bdcf996ab94 Mon Sep 17 00:00:00 2001 From: Eugene Konovalov <50055047+geny200@users.noreply.github.com> Date: Fri, 6 Sep 2024 14:30:11 +0300 Subject: [PATCH] Add Permit type like Semaphore --- .../main/scala/tofu/interop/CE2Agents.scala | 14 +++-- .../main/scala/tofu/interop/CE2Kernel.scala | 10 ++++ .../scala/tofu/concurrent/PermitSuite.scala | 47 +++++++++++++++ .../main/scala/tofu/interop/CE3Agents.scala | 14 +++-- .../main/scala/tofu/interop/CE3Kernel.scala | 27 ++++++--- .../scala/tofu/concurrent/PermitSuite.scala | 44 ++++++++++++++ .../internal/carriers/concurrentMacro.scala | 16 +++++ .../instances/MakePermitInstance.scala | 14 +++++ .../internal/carriers/concurrentMacro.scala | 10 ++++ .../instances/MakePermitInstance.scala | 12 ++++ .../main/scala/tofu/concurrent/Permit.scala | 58 +++++++++++++++++++ .../tofu/internal/carriers/concurrent.scala | 10 +++- 12 files changed, 258 insertions(+), 18 deletions(-) create mode 100644 modules/core/ce2/src/test/scala/tofu/concurrent/PermitSuite.scala create mode 100644 modules/core/ce3/src/test/scala/tofu/concurrent/PermitSuite.scala create mode 100644 modules/kernel/src/main/scala-2/tofu/internal/instances/MakePermitInstance.scala create mode 100644 modules/kernel/src/main/scala-3/tofu/internal/instances/MakePermitInstance.scala create mode 100644 modules/kernel/src/main/scala/tofu/concurrent/Permit.scala diff --git a/modules/core/ce2/src/main/scala/tofu/interop/CE2Agents.scala b/modules/core/ce2/src/main/scala/tofu/interop/CE2Agents.scala index 053d98149..558ceefb3 100644 --- a/modules/core/ce2/src/main/scala/tofu/interop/CE2Agents.scala +++ b/modules/core/ce2/src/main/scala/tofu/interop/CE2Agents.scala @@ -3,11 +3,11 @@ package tofu.interop import cats.{Functor, Monad} import cats.effect.concurrent.{Ref, Semaphore} import tofu.Fire -import tofu.concurrent.{Agent, SerialAgent} -import cats.syntax.all._ +import tofu.concurrent.{Agent, Permit, SerialAgent} +import cats.syntax.all.* import tofu.lift.Lift -import tofu.syntax.fire._ -import tofu.syntax.liftKernel._ +import tofu.syntax.fire.* +import tofu.syntax.liftKernel.* /** Default implementation of [[tofu.concurrent.Agent]] that consists of [[cats.effect.concurrent.Ref]] and * [[cats.effect.concurrent.Semaphore]] @@ -38,6 +38,12 @@ final case class SerialSemRef[F[_]: Monad, A](ref: Ref[F, A], sem: Semaphore[F]) modifyM(a => if (f.isDefinedAt(a)) f(a) else (default, a).pure[F]) } +/** Default implementation of [[tofu.concurrent.Permit]] that use [[cats.effect.concurrent.Semaphore]] + */ +final case class PermitSem[F[_]](sem: Semaphore[F]) extends Permit[F] { + def withPermit[A](fa: F[A]): F[A] = sem.withPermit(fa) +} + /** If instances of [[cats.effect.concurrent.Ref]] and [[cats.effect.concurrent.Semaphore]] can not be created for some * `G[_]`, but can be created for some `F[_]`, for which an instance of [[tofu.lift.Lift]] `Lift[F, G]` is present, * this implementation can be used diff --git a/modules/core/ce2/src/main/scala/tofu/interop/CE2Kernel.scala b/modules/core/ce2/src/main/scala/tofu/interop/CE2Kernel.scala index 75146d651..b20e722b0 100644 --- a/modules/core/ce2/src/main/scala/tofu/interop/CE2Kernel.scala +++ b/modules/core/ce2/src/main/scala/tofu/interop/CE2Kernel.scala @@ -165,6 +165,16 @@ object CE2Kernel { } yield UnderlyingSemRef[F, G, A](ref, sem) } + final def permitBySemaphore[I[_]: Monad, F[_]](implicit + makeSemaphore: MakeSemaphore[I, F] + ): MkPermitCE2Carrier[I, F] = + new MkPermitCE2Carrier[I, F] { + override def permitOf(limit: Long): I[Permit[F]] = + makeSemaphore + .semaphore(limit) + .map(PermitSem(_)) + } + def boundedParallel[F[_]: Async: Parallel]: BoundedParallelCarrierCE2[F] = new BoundedParallelCarrierCE2.Impl[F] { def parTraverse[T[_]: Traverse, A, B](in: T[A])(f: A => F[B]): F[T[B]] = Parallel.parTraverse(in)(f) diff --git a/modules/core/ce2/src/test/scala/tofu/concurrent/PermitSuite.scala b/modules/core/ce2/src/test/scala/tofu/concurrent/PermitSuite.scala new file mode 100644 index 000000000..512800aab --- /dev/null +++ b/modules/core/ce2/src/test/scala/tofu/concurrent/PermitSuite.scala @@ -0,0 +1,47 @@ +package tofu.concurrent + +import cats.data.ReaderT +import cats.effect.concurrent.Deferred +import cats.effect.syntax.concurrent.* +import cats.effect.{Concurrent, ContextShift, IO, Sync} +import cats.syntax.applicativeError.* +import org.scalatest.funsuite.AnyFunSuite +import tofu.compat.unused +import tofu.syntax.monadic.* + +import scala.concurrent.ExecutionContext + +class PermitSuite extends AnyFunSuite { + + private implicit val ioCS: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + + @unused + private def summonInstance[I[_]: Sync, F[_]: Concurrent]: MakePermit[I, F] = + implicitly[MakePermit[I, F]] + + test("check IO has Permit") { + assert( + MakePermit[IO, ReaderT[IO, Unit, _]] + .of(2) + .flatMap(permitProg(_).run(())) + .unsafeRunSync() === Left(()) + ) + } + + private def permitProg[F[_]: Concurrent](permit: Permit[F]): F[Either[Unit, Unit]] = + for { + wait1 <- Deferred[F, Unit] + wait2 <- Deferred[F, Unit] + waitEnd <- Deferred[F, Unit] + fiber1 <- permit.withPermit(wait1.complete(()) >> waitEnd.get).start + fiber2 <- permit.withPermit(wait2.complete(()) >> waitEnd.get).start + _ <- wait1.get + _ <- wait2.get + // fiber3 will be blocked, and throw an IllegalStateException when fiber1 or fiber2 will complete + fiber3 <- permit.withPermit(waitEnd.complete(()).attempt).start + resultFiber <- (fiber1.join >> fiber2.join >> fiber3.join).start + _ <- waitEnd.complete(()) + result <- resultFiber.join + } yield result.left.map(_ => ()) + +} diff --git a/modules/core/ce3/src/main/scala/tofu/interop/CE3Agents.scala b/modules/core/ce3/src/main/scala/tofu/interop/CE3Agents.scala index b0e2984ac..55e12cd4d 100644 --- a/modules/core/ce3/src/main/scala/tofu/interop/CE3Agents.scala +++ b/modules/core/ce3/src/main/scala/tofu/interop/CE3Agents.scala @@ -3,13 +3,13 @@ package tofu.interop import cats.effect.Ref import cats.effect.kernel.MonadCancelThrow import cats.effect.std.Semaphore +import cats.syntax.all.* import cats.{Functor, Monad} import tofu.Fire -import tofu.concurrent.{Agent, SerialAgent} -import cats.syntax.all._ +import tofu.concurrent.{Agent, Permit, SerialAgent} import tofu.lift.Lift -import tofu.syntax.fire._ -import tofu.syntax.liftKernel._ +import tofu.syntax.fire.* +import tofu.syntax.liftKernel.* /** Default implementation of [[tofu.concurrent.Agent]] that consists of [[cats.effect.Ref]] and * [[cats.effect.std.Semaphore]] @@ -40,6 +40,12 @@ final case class SerialSemRef[F[_]: MonadCancelThrow, A](ref: Ref[F, A], sem: Se modifyM(a => if (f.isDefinedAt(a)) f(a) else (default, a).pure[F]) } +/** Default implementation of [[tofu.concurrent.Permit]] that use [[cats.effect.std.Semaphore]] + */ +final case class PermitSem[F[_]: MonadCancelThrow](sem: Semaphore[F]) extends Permit[F] { + def withPermit[A](fa: F[A]): F[A] = sem.permit.use(_ => fa) +} + /** If instances of [[cats.effect.Ref]] and [[cats.effect.std.Semaphore]] can not be created for some `G[_]`, but can be * created for some `F[_]`, for which an instance of [[tofu.lift.Lift]] `Lift[F, G]` is present, this implementation * can be used diff --git a/modules/core/ce3/src/main/scala/tofu/interop/CE3Kernel.scala b/modules/core/ce3/src/main/scala/tofu/interop/CE3Kernel.scala index b4c308c6a..d58df69c2 100644 --- a/modules/core/ce3/src/main/scala/tofu/interop/CE3Kernel.scala +++ b/modules/core/ce3/src/main/scala/tofu/interop/CE3Kernel.scala @@ -1,24 +1,23 @@ package tofu.interop -import java.util.concurrent.TimeUnit - -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ExecutionContext, Future} - -import cats.effect.kernel._ +import cats.effect.kernel.* import cats.effect.std.Dispatcher import cats.effect.unsafe.IORuntime import cats.effect.{Async, Fiber, IO, Sync} import cats.{Functor, Monad, Parallel, Traverse} import tofu.compat.unused -import tofu.concurrent._ +import tofu.concurrent.* import tofu.concurrent.impl.QVarSM import tofu.internal.NonTofu -import tofu.internal.carriers._ +import tofu.internal.carriers.* import tofu.lift.Lift -import tofu.syntax.monadic._ +import tofu.syntax.monadic.* import tofu.{Fire, Scoped, WithContext} +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future} + object CE3Kernel { def delayViaSync[K[_]](implicit KS: Sync[K]): DelayCarrier3[K] = new DelayCarrier3[K] { @@ -165,6 +164,16 @@ object CE3Kernel { } yield UnderlyingSemRef[F, G, A](ref, sem) } + final def permitBySemaphore[I[_]: Monad, F[_]: MonadCancelThrow](implicit + makeSemaphore: MakeSemaphore[I, F] + ): MkPermitCE3Carrier[I, F] = + new MkPermitCE3Carrier[I, F] { + override def permitOf(limit: Long): I[Permit[F]] = + makeSemaphore + .semaphore(limit) + .map(new PermitSem[F](_)) + } + def boundedParallel[F[_]: Concurrent: Parallel]: BoundedParallelCarrierCE3[F] = new BoundedParallelCarrierCE3.Impl[F] { def parTraverse[T[_]: Traverse, A, B](in: T[A])(f: A => F[B]): F[T[B]] = diff --git a/modules/core/ce3/src/test/scala/tofu/concurrent/PermitSuite.scala b/modules/core/ce3/src/test/scala/tofu/concurrent/PermitSuite.scala new file mode 100644 index 000000000..7ac1818ab --- /dev/null +++ b/modules/core/ce3/src/test/scala/tofu/concurrent/PermitSuite.scala @@ -0,0 +1,44 @@ +package tofu.concurrent + +import cats.data.ReaderT +import cats.effect.kernel.Deferred +import cats.effect.syntax.spawn.* +import cats.effect.unsafe.IORuntime +import cats.effect.{Async, Concurrent, IO, Sync} +import org.scalatest.funsuite.AnyFunSuite +import tofu.compat.unused +import tofu.syntax.monadic.* + +class PermitSuite extends AnyFunSuite { + private implicit val iort: IORuntime = IORuntime.global + + @unused + private def summonInstance[I[_]: Sync, F[_]: Async]: MakePermit[I, F] = + implicitly[MakePermit[I, F]] + + test("check IO has Atom") { + assert( + MakePermit[IO, ReaderT[IO, Unit, _]] + .of(2) + .flatMap(permitProg(_).run(())) + .unsafeRunSync() === false + ) + } + + private def permitProg[F[_]: Concurrent](permit: Permit[F]): F[Boolean] = + for { + wait1 <- Deferred[F, Unit] + wait2 <- Deferred[F, Unit] + waitEnd <- Deferred[F, Unit] + fiber1 <- permit.withPermit(wait1.complete(()) >> waitEnd.get).start + fiber2 <- permit.withPermit(wait2.complete(()) >> waitEnd.get).start + _ <- wait1.get + _ <- wait2.get + // fiber3 will be blocked, and return false (on complete) when fiber1 or fiber2 will complete + fiber3 <- permit.withPermit(waitEnd.complete(())).start + resultFiber <- (fiber1.join >> fiber2.join >> fiber3.join).flatMap(_.embedNever).start + _ <- waitEnd.complete(()) + result <- resultFiber.join.flatMap(_.embedNever) + } yield result + +} diff --git a/modules/kernel/src/main/scala-2/tofu/internal/carriers/concurrentMacro.scala b/modules/kernel/src/main/scala-2/tofu/internal/carriers/concurrentMacro.scala index 3d7de48f1..177b9137c 100644 --- a/modules/kernel/src/main/scala-2/tofu/internal/carriers/concurrentMacro.scala +++ b/modules/kernel/src/main/scala-2/tofu/internal/carriers/concurrentMacro.scala @@ -65,3 +65,19 @@ trait MkSerialAgentCE3Carrier0Macro { F ], I, F, { val `tofu.interop.CE3Kernel.underlyingSerialAgentByRefAndSemaphore`: Unit }] } + +trait MkPermitCE2CarrierMacro { + final implicit def interopCE2Carrier[I[_], F[_]]: MkPermitCE2Carrier[I, F] = + macro Interop.delegate2[MkPermitCE2Carrier[ + I, + F + ], I, F, { val `tofu.interop.CE2Kernel.permitBySemaphore`: Unit }] +} + +trait MkPermitCE3CarrierMacro { + final implicit def interopCE3Carrier[I[_], F[_]]: MkPermitCE3Carrier[I, F] = + macro Interop.delegate2[MkPermitCE3Carrier[ + I, + F + ], I, F, { val `tofu.interop.CE3Kernel.permitBySemaphore`: Unit }] +} diff --git a/modules/kernel/src/main/scala-2/tofu/internal/instances/MakePermitInstance.scala b/modules/kernel/src/main/scala-2/tofu/internal/instances/MakePermitInstance.scala new file mode 100644 index 000000000..458f658e5 --- /dev/null +++ b/modules/kernel/src/main/scala-2/tofu/internal/instances/MakePermitInstance.scala @@ -0,0 +1,14 @@ +package tofu.internal.instances + +import tofu.concurrent.MakePermit +import tofu.internal.carriers.{MkPermitCE2Carrier, MkPermitCE3Carrier} + +private[tofu] trait MakePermitInstance extends MakePermitInstance0 { + final implicit def interopCE3[I[_], F[_]](implicit carrier: MkPermitCE3Carrier[I, F]): MakePermit[I, F] = + carrier +} + +private[tofu] trait MakePermitInstance0 { + final implicit def interopCE2[I[_], F[_]](implicit carrier: MkPermitCE2Carrier[I, F]): MakePermit[I, F] = + carrier +} diff --git a/modules/kernel/src/main/scala-3/tofu/internal/carriers/concurrentMacro.scala b/modules/kernel/src/main/scala-3/tofu/internal/carriers/concurrentMacro.scala index 58eb7a4f2..472d81a0f 100644 --- a/modules/kernel/src/main/scala-3/tofu/internal/carriers/concurrentMacro.scala +++ b/modules/kernel/src/main/scala-3/tofu/internal/carriers/concurrentMacro.scala @@ -45,3 +45,13 @@ trait MkSerialAgentCE3Carrier0Macro: Interop.delegate2[I, F, MkSerialAgentCE3Carrier[I, F]]( "tofu.interop.CE3Kernel.underlyingSerialAgentByRefAndSemaphore" ) + +trait MkPermitCE2CarrierMacro: + inline given interopCE2Carrier[I[_], F[_]]: MkPermitCE2Carrier[I, F] = + Interop.delegate2[I, F, MkPermitCE2Carrier[I, F]]("tofu.interop.CE2Kernel.permitBySemaphore") + +trait MkPermitCE3CarrierMacro: + inline given underlyinginteropCE3Carrier[I[_], F[_]]: MkPermitCE3Carrier[I, F] = + Interop.delegate2[I, F, MkPermitCE3Carrier[I, F]]( + "tofu.interop.CE3Kernel.permitBySemaphore" + ) diff --git a/modules/kernel/src/main/scala-3/tofu/internal/instances/MakePermitInstance.scala b/modules/kernel/src/main/scala-3/tofu/internal/instances/MakePermitInstance.scala new file mode 100644 index 000000000..5426bd0d4 --- /dev/null +++ b/modules/kernel/src/main/scala-3/tofu/internal/instances/MakePermitInstance.scala @@ -0,0 +1,12 @@ +package tofu.internal +package instances + +import scala.compiletime.summonFrom +import tofu.concurrent.MakePermit +import tofu.internal.carriers.{MkPermitCE2Carrier, MkPermitCE3Carrier} + +private[tofu] trait MakePermitInstance: + inline given [I[_], F[_]]: MakePermit[I, F] = summonFrom { + case carrier: MkPermitCE2Carrier[I, F] => carrier + case carrier: MkPermitCE3Carrier[I, F] => carrier + } diff --git a/modules/kernel/src/main/scala/tofu/concurrent/Permit.scala b/modules/kernel/src/main/scala/tofu/concurrent/Permit.scala new file mode 100644 index 000000000..1b9081a02 --- /dev/null +++ b/modules/kernel/src/main/scala/tofu/concurrent/Permit.scala @@ -0,0 +1,58 @@ +package tofu.concurrent + +import tofu.internal.instances.MakePermitInstance + +/** A purely functional semaphore. A semaphore has a non-negative number of permits available. Acquiring a permit + * decrements the current number of permits and releasing a permit increases the current number of permits. An acquire + * that occurs when there are no permits available results in semantic blocking until a permit becomes available. + * Blocking `withPermit` are cancelable. + */ +trait Permit[F[_]] { + + /** Returns an effect that acquires a permit, runs the supplied effect, and then releases the permit. The returned + * effect semantically blocks until permit are available. Note that acquires are statisfied in strict FIFO order. + */ + def withPermit[A](fa: F[A]): F[A] + +} + +object Permit { + type Make[F[_]] = MakePermit[F, F] + + /** A helper for creating instances of [[tofu.concurrent.Permit]] that use the same effect during construction and + * work. If you want to use different effect to construct `Permit` use [[tofu.concurrent.MakePermit]] + */ + def Make[F[_]](implicit makePermit: Make[F]): MakePermit.PermitApplier[F, F] = + new MakePermit.PermitApplier[F, F](makePermit) +} + +/** A creator of [[tofu.concurrent.Permit]] that supports effectful construction + * @tparam I + * effect for creation of agent + * @tparam F + * effect on which agent will run + */ +trait MakePermit[I[_], F[_]] { + + /** Creates instance of [[tofu.concurrent.Permit]], initialized with `limit` available permits + * + * @param limit + * maximum concurrent permits + * @return + * `I[ Permit[F] ]` + */ + def permitOf(limit: Long): I[Permit[F]] +} + +/** A helper for creating instances of [[tofu.concurrent.Permit]] that use different effects during construction and + * work. If you want to use same effect to construct and run `Permit` use [[tofu.concurrent.Permit.Make]] + */ +object MakePermit extends MakePermitInstance { + + def apply[I[_], F[_]](implicit mkPermit: MakePermit[I, F]): PermitApplier[I, F] = + new PermitApplier[I, F](mkPermit) + + final class PermitApplier[I[_], F[_]](private val mkPermit: MakePermit[I, F]) extends AnyVal { + def of(limit: Long): I[Permit[F]] = mkPermit.permitOf(limit) + } +} diff --git a/modules/kernel/src/main/scala/tofu/internal/carriers/concurrent.scala b/modules/kernel/src/main/scala/tofu/internal/carriers/concurrent.scala index 08dc02f32..52fc40568 100644 --- a/modules/kernel/src/main/scala/tofu/internal/carriers/concurrent.scala +++ b/modules/kernel/src/main/scala/tofu/internal/carriers/concurrent.scala @@ -1,6 +1,6 @@ package tofu.internal.carriers -import tofu.concurrent.{MakeAgent, MakeAtom, MakeQVar, MakeSerialAgent} +import tofu.concurrent.{MakeAgent, MakeAtom, MakePermit, MakeQVar, MakeSerialAgent} trait MkAtomCE2Carrier[I[_], F[_]] extends MakeAtom[I, F] @@ -33,3 +33,11 @@ object MkSerialAgentCE2Carrier extends MkSerialAgentCE2CarrierMacro trait MkSerialAgentCE3Carrier[I[_], F[_]] extends MakeSerialAgent[I, F] object MkSerialAgentCE3Carrier extends MkSerialAgentCE3CarrierMacro + +trait MkPermitCE2Carrier[I[_], F[_]] extends MakePermit[I, F] + +object MkPermitCE2Carrier extends MkPermitCE2CarrierMacro + +trait MkPermitCE3Carrier[I[_], F[_]] extends MakePermit[I, F] + +object MkPermitCE3Carrier extends MkPermitCE3CarrierMacro