From b4ecd33042c371bd74b85372d68cb8b1c24a5cb0 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Fri, 24 Nov 2023 02:38:19 +0000 Subject: [PATCH 01/54] Execute `IO.blocking` on WSTP without `BlockContext` indirection --- .../effect/unsafe/WorkStealingThreadPool.scala | 8 +++++--- .../scala/cats/effect/unsafe/WorkerThread.scala | 15 ++++++++++----- .../src/main/scala/cats/effect/IOFiber.scala | 2 +- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index cd740b0b41..e17ae8ac61 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -485,14 +485,16 @@ private[effect] final class WorkStealingThreadPool( } /** - * Checks if the blocking code can be executed in the current context (only returns true for - * worker threads that belong to this execution context). + * Checks if the blocking code can be executed in the current context and, if so, prepares it + * for blocking. Only returns true for worker threads that belong to this execution context. */ private[effect] def canExecuteBlockingCode(): Boolean = { val thread = Thread.currentThread() if (thread.isInstanceOf[WorkerThread]) { val worker = thread.asInstanceOf[WorkerThread] - worker.canExecuteBlockingCodeOn(this) + if (worker.canExecuteBlockingCodeOn(this)) + worker.prepareBlocking() + true } else { false } diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 849e71a2d4..e3e6fb96ae 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -776,7 +776,7 @@ private final class WorkerThread( } /** - * A mechanism for executing support code before executing a blocking action. + * Support code that must be run before executing a blocking action on this thread. * * The current thread creates a replacement worker thread (or reuses a cached one) that will * take its place in the pool and does a complete transfer of ownership of the data structures @@ -797,7 +797,7 @@ private final class WorkerThread( * There is no reason to enclose any code in a `try/catch` block because the only way this * code path can be exercised is through `IO.delay`, which already handles exceptions. */ - override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { + def prepareBlocking(): Unit = { val rnd = random pool.notifyParked(rnd) @@ -806,7 +806,6 @@ private final class WorkerThread( // This `WorkerThread` is already inside an enclosing blocking region. // There is no need to spawn another `WorkerThread`. Instead, directly // execute the blocking action. - thunk } else { // Spawn a new `WorkerThread` to take the place of this thread, as the // current thread prepares to execute a blocking action. @@ -853,11 +852,17 @@ private final class WorkerThread( pool.blockedWorkerThreadCounter.incrementAndGet() clone.start() } - - thunk } } + /** + * A mechanism for executing support code before executing a blocking action. + */ + override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { + prepareBlocking() + thunk + } + private[this] def init(newIdx: Int): Unit = { _index = newIdx queue = pool.localQueues(newIdx) diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index ffea35d49c..c3b59b3724 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -994,7 +994,7 @@ private final class IOFiber[A]( var error: Throwable = null val r = try { - scala.concurrent.blocking(cur.thunk()) + cur.thunk() } catch { case t if NonFatal(t) => error = t From e4a30c5f7598db2091aca8bf7af76126742591bb Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Fri, 24 Nov 2023 02:44:33 +0000 Subject: [PATCH 02/54] Remove stale comment --- core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index e3e6fb96ae..05be8fc974 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -804,8 +804,7 @@ private final class WorkerThread( if (blocking) { // This `WorkerThread` is already inside an enclosing blocking region. - // There is no need to spawn another `WorkerThread`. Instead, directly - // execute the blocking action. + // There is no need to spawn another `WorkerThread`. } else { // Spawn a new `WorkerThread` to take the place of this thread, as the // current thread prepares to execute a blocking action. From 9e8f282eebe3eeda7183603baa8d6273739cf252 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Fri, 24 Nov 2023 02:48:00 +0000 Subject: [PATCH 03/54] Update comments --- core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 05be8fc974..99c3ffeff4 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -803,7 +803,7 @@ private final class WorkerThread( pool.notifyParked(rnd) if (blocking) { - // This `WorkerThread` is already inside an enclosing blocking region. + // This `WorkerThread` has already been prepared for blocking. // There is no need to spawn another `WorkerThread`. } else { // Spawn a new `WorkerThread` to take the place of this thread, as the @@ -817,7 +817,7 @@ private final class WorkerThread( cedeBypass = null } - // Logically enter the blocking region. + // Logically become a blocking thread blocking = true val prefix = pool.blockerThreadPrefix From de501427201ff2fcd56513da02f26c4b291ae090 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Fri, 24 Nov 2023 02:50:34 +0000 Subject: [PATCH 04/54] We like full sentences here. --- core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 99c3ffeff4..893df5d223 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -817,7 +817,7 @@ private final class WorkerThread( cedeBypass = null } - // Logically become a blocking thread + // Logically become a blocking thread. blocking = true val prefix = pool.blockerThreadPrefix From c2ef8bc67f1cc6862d5c9c737f6e107406c27c40 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Fri, 24 Nov 2023 10:47:57 -0800 Subject: [PATCH 05/54] Fix conditionals --- .../scala/cats/effect/unsafe/WorkStealingThreadPool.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index e17ae8ac61..dd0c564ecd 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -492,12 +492,12 @@ private[effect] final class WorkStealingThreadPool( val thread = Thread.currentThread() if (thread.isInstanceOf[WorkerThread]) { val worker = thread.asInstanceOf[WorkerThread] - if (worker.canExecuteBlockingCodeOn(this)) + if (worker.canExecuteBlockingCodeOn(this)) { worker.prepareBlocking() - true - } else { - false + return true + } } + false } /** From 3b12e3ebe91c3e347b8da316637321ec5d2b4269 Mon Sep 17 00:00:00 2001 From: josgarmar28 Date: Thu, 28 Dec 2023 11:32:06 +0100 Subject: [PATCH 06/54] fix: acquire semaphore only when there is a resource present --- .../src/main/scala/cats/effect/std/Hotswap.scala | 12 ++++++++++-- .../src/test/scala/cats/effect/std/HotswapSpec.scala | 12 +++++++++++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala index cf68fb204d..922cc24db4 100644 --- a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala +++ b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala @@ -121,7 +121,15 @@ object Hotswap { def raise(message: String): F[Unit] = F.raiseError[Unit](new RuntimeException(message)) - def shared: Resource[F, Unit] = semaphore.permit + def shared(state: Ref[F, State]): Resource[F, Unit] = + Resource.makeFull[F, Unit] { poll => + poll( + state.get.flatMap { + case Acquired(_, _) => semaphore.acquire + case _ => F.unit + } + ) + } { _ => semaphore.release } def exclusive: Resource[F, Unit] = Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ => @@ -141,7 +149,7 @@ object Hotswap { } override def get: Resource[F, Option[R]] = - shared.evalMap { _ => + shared(state).evalMap { _ => state.get.map { case Acquired(r, _) => Some(r) case _ => None diff --git a/tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala b/tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala index c335d7782e..3e451625e1 100644 --- a/tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala @@ -165,6 +165,16 @@ class HotswapSpec extends BaseSpec { outer => TestControl.executeEmbed(go, IORuntimeConfig(1, 2)).replicateA_(1000) must completeAs(()) } - } + "get should not acquire a lock when there is no resource present" in ticked { + implicit ticker => + val go = Hotswap.create[IO, Unit].use { hs => + hs.get.useForever.start *> + hs.swap(IO.sleep(1.second).toResource) *> + IO.sleep(2.seconds) *> + hs.get.use_.timeout(1.second).void + } + go must completeAs(()) + } + } } From 3fe4f8ce203b24753b7008e0446c99e8d5bf04d1 Mon Sep 17 00:00:00 2001 From: josgarmar28 Date: Fri, 29 Dec 2023 17:15:46 +0100 Subject: [PATCH 07/54] fix: release permit condition and scope of the poll --- .../src/main/scala/cats/effect/std/Hotswap.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala index 922cc24db4..dc71588d52 100644 --- a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala +++ b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala @@ -121,15 +121,13 @@ object Hotswap { def raise(message: String): F[Unit] = F.raiseError[Unit](new RuntimeException(message)) - def shared(state: Ref[F, State]): Resource[F, Unit] = - Resource.makeFull[F, Unit] { poll => - poll( - state.get.flatMap { - case Acquired(_, _) => semaphore.acquire - case _ => F.unit - } - ) - } { _ => semaphore.release } + def shared(state: Ref[F, State]): Resource[F, Boolean] = + Resource.makeFull[F, Boolean] { poll => + state.get.flatMap { + case Acquired(_, _) => poll(semaphore.acquire.as(true)) + case _ => F.pure(false) + } + } { r => if (r) semaphore.release else F.unit } def exclusive: Resource[F, Unit] = Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ => From 8e1ff54b9b6f4ccd1b5287cb1ff384c56284090a Mon Sep 17 00:00:00 2001 From: josgarmar28 Date: Fri, 29 Dec 2023 18:55:42 +0100 Subject: [PATCH 08/54] fix: race condition --- .../main/scala/cats/effect/std/Hotswap.scala | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala index dc71588d52..0ac3903eb9 100644 --- a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala +++ b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala @@ -121,14 +121,6 @@ object Hotswap { def raise(message: String): F[Unit] = F.raiseError[Unit](new RuntimeException(message)) - def shared(state: Ref[F, State]): Resource[F, Boolean] = - Resource.makeFull[F, Boolean] { poll => - state.get.flatMap { - case Acquired(_, _) => poll(semaphore.acquire.as(true)) - case _ => F.pure(false) - } - } { r => if (r) semaphore.release else F.unit } - def exclusive: Resource[F, Unit] = Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ => semaphore.releaseN(Long.MaxValue)) @@ -147,12 +139,12 @@ object Hotswap { } override def get: Resource[F, Option[R]] = - shared(state).evalMap { _ => - state.get.map { - case Acquired(r, _) => Some(r) - case _ => None + Resource.makeFull[F, Option[R]] { poll => + state.get.flatMap { + case Acquired(r, _) => poll(semaphore.acquire.as(Some(r))) + case _ => F.pure(None) } - } + } { r => if (r.isDefined) semaphore.release else F.unit } override def clear: F[Unit] = exclusive.surround(swapFinalizer(Cleared).uncancelable) From 281b9db85c979ece85658b6102c1dc6c11863edb Mon Sep 17 00:00:00 2001 From: josgarmar28 Date: Sat, 30 Dec 2023 10:42:24 +0100 Subject: [PATCH 09/54] fix: run state.get only when semaphore is aquired --- .../main/scala/cats/effect/std/Hotswap.scala | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala index 0ac3903eb9..92ee9bf816 100644 --- a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala +++ b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala @@ -121,6 +121,14 @@ object Hotswap { def raise(message: String): F[Unit] = F.raiseError[Unit](new RuntimeException(message)) + def shared(state: Ref[F, State]): Resource[F, Boolean] = + Resource.makeFull[F, Boolean] { poll => + state.get.flatMap { + case Acquired(_, _) => poll(semaphore.acquire.as(true)) + case _ => F.pure(false) + } + } { r => if (r) semaphore.release else F.unit } + def exclusive: Resource[F, Unit] = Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ => semaphore.releaseN(Long.MaxValue)) @@ -139,12 +147,14 @@ object Hotswap { } override def get: Resource[F, Option[R]] = - Resource.makeFull[F, Option[R]] { poll => - state.get.flatMap { - case Acquired(r, _) => poll(semaphore.acquire.as(Some(r))) - case _ => F.pure(None) - } - } { r => if (r.isDefined) semaphore.release else F.unit } + shared(state).evalMap { r => + if (r) + state.get.map { + case Acquired(r, _) => Some(r) + case _ => None + } + else F.pure[Option[R]](None) + } override def clear: F[Unit] = exclusive.surround(swapFinalizer(Cleared).uncancelable) From 3f433157e5f6300d960d509587094869f8b01301 Mon Sep 17 00:00:00 2001 From: josgarmar28 Date: Sun, 31 Dec 2023 15:55:22 +0100 Subject: [PATCH 10/54] fix: acquire semaphore before state.get --- std/shared/src/main/scala/cats/effect/std/Hotswap.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala index 92ee9bf816..d061c1ffd6 100644 --- a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala +++ b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala @@ -123,10 +123,11 @@ object Hotswap { def shared(state: Ref[F, State]): Resource[F, Boolean] = Resource.makeFull[F, Boolean] { poll => - state.get.flatMap { - case Acquired(_, _) => poll(semaphore.acquire.as(true)) - case _ => F.pure(false) - } + poll(semaphore.acquire).flatMap(_ => + state.get.flatMap { + case Acquired(_, _) => F.pure(true) + case _ => semaphore.release.as(false) + }) } { r => if (r) semaphore.release else F.unit } def exclusive: Resource[F, Unit] = From 139d3f2b539e7a4c3005469bcdd81e4a5ca385b6 Mon Sep 17 00:00:00 2001 From: josgarmar28 Date: Sun, 31 Dec 2023 18:58:42 +0100 Subject: [PATCH 11/54] chore: merge shared and get methods and add code comment --- .../main/scala/cats/effect/std/Hotswap.scala | 27 +++++++------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala index d061c1ffd6..da4f987950 100644 --- a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala +++ b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala @@ -73,8 +73,9 @@ sealed trait Hotswap[F[_], R] { def swap(next: Resource[F, R]): F[R] /** - * Gets the current resource, if it exists. The returned resource is guaranteed to be - * available for the duration of the returned resource. + * Acquires a shared lock to retrieve the current resource, if it exists. The returned + * resource is guaranteed to be available for its duration. The lock is released if the + * current resource does not exist. */ def get: Resource[F, Option[R]] @@ -121,15 +122,6 @@ object Hotswap { def raise(message: String): F[Unit] = F.raiseError[Unit](new RuntimeException(message)) - def shared(state: Ref[F, State]): Resource[F, Boolean] = - Resource.makeFull[F, Boolean] { poll => - poll(semaphore.acquire).flatMap(_ => - state.get.flatMap { - case Acquired(_, _) => F.pure(true) - case _ => semaphore.release.as(false) - }) - } { r => if (r) semaphore.release else F.unit } - def exclusive: Resource[F, Unit] = Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ => semaphore.releaseN(Long.MaxValue)) @@ -148,14 +140,13 @@ object Hotswap { } override def get: Resource[F, Option[R]] = - shared(state).evalMap { r => - if (r) - state.get.map { - case Acquired(r, _) => Some(r) - case _ => None + Resource.makeFull[F, Option[R]] { poll => + poll(semaphore.acquire) *> + state.get.flatMap { + case Acquired(r, _) => F.pure(Some(r)) + case _ => semaphore.release.as(None) } - else F.pure[Option[R]](None) - } + } { r => if (r.isDefined) semaphore.release else F.unit } override def clear: F[Unit] = exclusive.surround(swapFinalizer(Cleared).uncancelable) From bd3472ce623fe5e13cf70ca9942380bd435a4ad2 Mon Sep 17 00:00:00 2001 From: Jose Antonio Garcia Martagon <83660317+josgarmar28@users.noreply.github.com> Date: Tue, 2 Jan 2024 13:03:18 +0100 Subject: [PATCH 12/54] Update std/shared/src/main/scala/cats/effect/std/Hotswap.scala Add comment Co-authored-by: Arman Bilge --- std/shared/src/main/scala/cats/effect/std/Hotswap.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala index da4f987950..9718263509 100644 --- a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala +++ b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala @@ -141,7 +141,7 @@ object Hotswap { override def get: Resource[F, Option[R]] = Resource.makeFull[F, Option[R]] { poll => - poll(semaphore.acquire) *> + poll(semaphore.acquire) *> // acquire shared lock state.get.flatMap { case Acquired(r, _) => F.pure(Some(r)) case _ => semaphore.release.as(None) From 7f05819684c4e374f504444ef04f847da4f5dc99 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 8 Jan 2024 20:02:24 +0000 Subject: [PATCH 13/54] Fix off-by-1 in `CallbackStack#pack` --- .../scala/cats/effect/CallbackStack.scala | 2 +- .../scala/cats/effect/CallbackStackSpec.scala | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) create mode 100644 tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 8ba7b08a2d..0a23745921 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -127,7 +127,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) } else { if (child == null) { // bottomed out - removed + removed + 1 } else { // note this can cause the bound to go negative, which is fine child.packInternal(bound - 1, removed + 1, parent) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala new file mode 100644 index 0000000000..806f2eb40b --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2020-2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +class CallbackStackSpec extends BaseSpec { + + "CallbackStack" should { + "correctly report the number removed" in { + val stack = CallbackStack[Unit](null) + val pushed = stack.push(_ => ()) + val handle = pushed.currentHandle() + pushed.clearCurrent(handle) + stack.pack(1) must beEqualTo(1) + } + } + +} From 444512f0ddda7ebce125e44de1e9ec9b6f5218c9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 8 Jan 2024 20:54:56 +0000 Subject: [PATCH 14/54] Isolate thread state change to `prepareBlocking()` --- .../effect/unsafe/WorkStealingThreadPool.scala | 1 + .../effect/unsafe/WorkStealingThreadPool.scala | 18 +++++++++++++----- .../src/main/scala/cats/effect/IOFiber.scala | 2 ++ 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index f47fc7889a..702fa612a8 100644 --- a/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -36,6 +36,7 @@ private[effect] sealed abstract class WorkStealingThreadPool private () task: Runnable, fallback: Scheduler): Runnable private[effect] def canExecuteBlockingCode(): Boolean + private[effect] def prepareBlocking(): Unit private[unsafe] def liveTraces(): ( Map[Runnable, Trace], Map[WorkerThread, (Thread.State, Option[(Runnable, Trace)], Map[Runnable, Trace])], diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index dd0c564ecd..a3e66f897d 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -492,12 +492,20 @@ private[effect] final class WorkStealingThreadPool( val thread = Thread.currentThread() if (thread.isInstanceOf[WorkerThread]) { val worker = thread.asInstanceOf[WorkerThread] - if (worker.canExecuteBlockingCodeOn(this)) { - worker.prepareBlocking() - return true - } + worker.canExecuteBlockingCodeOn(this) + } else { + false } - false + } + + /** + * Prepares the current thread for running blocking code. This should be called only if + * [[canExecuteBlockingCode]] returns `true`. + */ + private[effect] def prepareBlocking(): Unit = { + val thread = Thread.currentThread() + val worker = thread.asInstanceOf[WorkerThread] + worker.prepareBlocking() } /** diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index c3b59b3724..402cbd75f2 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -991,6 +991,8 @@ private final class IOFiber[A]( if (ec.isInstanceOf[WorkStealingThreadPool]) { val wstp = ec.asInstanceOf[WorkStealingThreadPool] if (wstp.canExecuteBlockingCode()) { + wstp.prepareBlocking() + var error: Throwable = null val r = try { From bf03fb3004943071c049e7f06684e7e3f15d4950 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 8 Jan 2024 20:57:57 +0000 Subject: [PATCH 15/54] Restore scaladoc --- .../scala/cats/effect/unsafe/WorkStealingThreadPool.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index a3e66f897d..88e0cceff3 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -485,8 +485,8 @@ private[effect] final class WorkStealingThreadPool( } /** - * Checks if the blocking code can be executed in the current context and, if so, prepares it - * for blocking. Only returns true for worker threads that belong to this execution context. + * Checks if the blocking code can be executed in the current context (only returns true for + * worker threads that belong to this execution context). */ private[effect] def canExecuteBlockingCode(): Boolean = { val thread = Thread.currentThread() From 862b60cd1f27369881b2c320a3d06a975c703d75 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 8 Jan 2024 20:59:20 +0000 Subject: [PATCH 16/54] Bikeshed --- .../scala/cats/effect/unsafe/WorkStealingThreadPool.scala | 2 +- .../scala/cats/effect/unsafe/WorkStealingThreadPool.scala | 4 ++-- core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 4 ++-- core/shared/src/main/scala/cats/effect/IOFiber.scala | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 702fa612a8..1b0ff7b5a2 100644 --- a/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/js-native/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -36,7 +36,7 @@ private[effect] sealed abstract class WorkStealingThreadPool private () task: Runnable, fallback: Scheduler): Runnable private[effect] def canExecuteBlockingCode(): Boolean - private[effect] def prepareBlocking(): Unit + private[effect] def prepareForBlocking(): Unit private[unsafe] def liveTraces(): ( Map[Runnable, Trace], Map[WorkerThread, (Thread.State, Option[(Runnable, Trace)], Map[Runnable, Trace])], diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 88e0cceff3..ab815b25e3 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -502,10 +502,10 @@ private[effect] final class WorkStealingThreadPool( * Prepares the current thread for running blocking code. This should be called only if * [[canExecuteBlockingCode]] returns `true`. */ - private[effect] def prepareBlocking(): Unit = { + private[effect] def prepareForBlocking(): Unit = { val thread = Thread.currentThread() val worker = thread.asInstanceOf[WorkerThread] - worker.prepareBlocking() + worker.prepareForBlocking() } /** diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 893df5d223..35d265d251 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -797,7 +797,7 @@ private final class WorkerThread( * There is no reason to enclose any code in a `try/catch` block because the only way this * code path can be exercised is through `IO.delay`, which already handles exceptions. */ - def prepareBlocking(): Unit = { + def prepareForBlocking(): Unit = { val rnd = random pool.notifyParked(rnd) @@ -858,7 +858,7 @@ private final class WorkerThread( * A mechanism for executing support code before executing a blocking action. */ override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { - prepareBlocking() + prepareForBlocking() thunk } diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index 402cbd75f2..d0f86707a6 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -991,7 +991,7 @@ private final class IOFiber[A]( if (ec.isInstanceOf[WorkStealingThreadPool]) { val wstp = ec.asInstanceOf[WorkStealingThreadPool] if (wstp.canExecuteBlockingCode()) { - wstp.prepareBlocking() + wstp.prepareForBlocking() var error: Throwable = null val r = From 8341a4ee3a57f2eadff91af15173caeb2a53f635 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 8 Jan 2024 21:47:05 +0000 Subject: [PATCH 17/54] Remove unneeded `notifyParked` when worker transitions to blocking --- core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 849e71a2d4..e731f11827 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -800,8 +800,6 @@ private final class WorkerThread( override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { val rnd = random - pool.notifyParked(rnd) - if (blocking) { // This `WorkerThread` is already inside an enclosing blocking region. // There is no need to spawn another `WorkerThread`. Instead, directly From 05145d993f17c66f561ee9270374fe183eb550e1 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 8 Jan 2024 14:05:15 -0800 Subject: [PATCH 18/54] `random` is no longer used --- core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index e731f11827..4c72714afb 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -798,8 +798,6 @@ private final class WorkerThread( * code path can be exercised is through `IO.delay`, which already handles exceptions. */ override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { - val rnd = random - if (blocking) { // This `WorkerThread` is already inside an enclosing blocking region. // There is no need to spawn another `WorkerThread`. Instead, directly From 0450001e9150de9b268604db473ac1a254bd49c6 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 8 Jan 2024 22:06:45 +0000 Subject: [PATCH 19/54] Relocate scaladoc note --- .../src/main/scala/cats/effect/unsafe/WorkerThread.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 35d265d251..ab3a068407 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -792,10 +792,6 @@ private final class WorkerThread( * continue, it will be cached for a period of time instead. Finally, the `blocking` flag is * useful when entering nested blocking regions. In this case, there is no need to spawn a * replacement worker thread. - * - * @note - * There is no reason to enclose any code in a `try/catch` block because the only way this - * code path can be exercised is through `IO.delay`, which already handles exceptions. */ def prepareForBlocking(): Unit = { val rnd = random @@ -856,6 +852,10 @@ private final class WorkerThread( /** * A mechanism for executing support code before executing a blocking action. + * + * @note + * There is no reason to enclose any code in a `try/catch` block because the only way this + * code path can be exercised is through `IO.delay`, which already handles exceptions. */ override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { prepareForBlocking() From b442eb2e3516a17592be9083c77748e2ce0549a6 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 8 Jan 2024 22:42:22 +0000 Subject: [PATCH 20/54] Reset auto-cede counter after `blocking` --- core/shared/src/main/scala/cats/effect/IOFiber.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index ffea35d49c..f39d6114d7 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -1003,7 +1003,8 @@ private final class IOFiber[A]( } val next = if (error eq null) succeeded(r, 0) else failed(error, 0) - runLoop(next, nextCancelation, nextAutoCede) + // reset auto-cede counter + runLoop(next, nextCancelation, runtime.autoYieldThreshold) } else { blockingFallback(cur) } From 6e2e87d57c7f7db7295dd7cdb5a8f2af9963c0a9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 9 Jan 2024 01:20:06 +0000 Subject: [PATCH 21/54] Add test for `CallbackStack#pack` race condition Co-authored-by: Matthias Ernst --- .../scala/cats/effect/CallbackStackSpec.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index 806f2eb40b..c5afd5fa10 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -26,6 +26,31 @@ class CallbackStackSpec extends BaseSpec { pushed.clearCurrent(handle) stack.pack(1) must beEqualTo(1) } + + "handle race conditions in pack" in real { + IO { + val stack = CallbackStack[Unit](null) + locally { + val pushed = stack.push(_ => ()) + val handle = pushed.currentHandle() + pushed.clearCurrent(handle) + } + val clear = { + val pushed = stack.push(_ => ()) + val handle = pushed.currentHandle() + IO(pushed.clearCurrent(handle)) + } + (stack, clear) + }.flatMap { + case (stack, clear) => + val pack = IO(stack.pack(1)) + pack.both(clear *> pack).map { + case (x, y) => + (x + y) must beEqualTo(2) + } + }.replicateA_(1000) + .as(ok) + } } } From 632ca06617537ae6f0f96d9eb3fe5d1b52946679 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 9 Jan 2024 16:01:02 +0000 Subject: [PATCH 22/54] wip pack lock Co-authored-by: Sam Pillsworth Co-authored-by: Matthias Ernst --- .../scala/cats/effect/CallbackStack.scala | 99 +++++++++++++++++-- 1 file changed, 91 insertions(+), 8 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 0a23745921..404e8ffd36 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -18,17 +18,94 @@ package cats.effect import scala.annotation.tailrec +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference +import CallbackStack.Node + +private final class CallbackStack[A](private[this] var callback: A => Unit) + extends AtomicReference[Node[A]] { + head => + + private[this] val allowedToPack = new AtomicBoolean(true) + + def push(cb: A => Unit): Node[A] = { + val newHead = new Node(cb) + + @tailrec + def loop(): CallbackStack[A] = { + val currentHead = head.get() + newHead.next = currentHead + + if (!head.compareAndSet(currentHead, newHead)) + loop() + else + newHead + } + + loop() + } + + def unsafeSetCallback(cb: A => Unit): Unit = { + callback = cb + } + + /** + * Invokes *all* non-null callbacks in the queue, starting with the current one. Returns true + * iff *any* callbacks were invoked. + */ + def apply(a: A): Boolean = { + while (!allowedToPack.compareAndSet(true, false)) { + // spinloop + } + + val cb = callback + var invoked = if (cb != null) { + cb(a) + true + } else { + false + } + var currentNode = head.get() + + while (currentNode ne null) { + val cb = currentNode.getCallback() + if (cb != null) { + cb(a) + invoked = true + } + currentNode = currentNode.next + } + + invoked + } +} + +private object CallbackStack { + private[CallbackStack] final class Node[A]( + private[this] var callback: A => Unit, + ) { + var next: Node[A] = _ + + def getCallback(): A => Unit = callback + + def clear(): Unit = { + callback = null + } + } +} + private final class CallbackStack[A](private[this] var callback: A => Unit) extends AtomicReference[CallbackStack[A]] { + val allowedToPack = new AtomicBoolean(true) + def push(next: A => Unit): CallbackStack[A] = { val attempt = new CallbackStack(next) @tailrec def loop(): CallbackStack[A] = { - val cur = get() + val cur = head.get() attempt.lazySet(cur) if (!compareAndSet(cur, attempt)) @@ -106,14 +183,20 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) * (amortized). This still biases the optimizations towards the head of the list, but ensures * that packing will still inevitably reach all of the garbage cells. */ - def pack(bound: Int): Int = { - // the first cell is always retained - val got = get() - if (got ne null) - got.packInternal(bound, 0, this) - else + def pack(bound: Int): Int = + if (allowedToPack.compareAndSet(true, false)) { + // the first cell is always retained + val got = get() + val rtn = + if (got ne null) + got.packInternal(bound, 0, this) + else + 0 + allowedToPack.set(true) + rtn + } else { 0 - } + } @tailrec private def packInternal(bound: Int, removed: Int, parent: CallbackStack[A]): Int = { From c04e1238097b787bc25686f341d2c65d59515b27 Mon Sep 17 00:00:00 2001 From: josgarmar28 Date: Tue, 9 Jan 2024 18:33:36 +0100 Subject: [PATCH 23/54] fix: test and revert description changes --- std/shared/src/main/scala/cats/effect/std/Hotswap.scala | 5 ++--- .../shared/src/test/scala/cats/effect/std/HotswapSpec.scala | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala index 9718263509..05a39a9303 100644 --- a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala +++ b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala @@ -73,9 +73,8 @@ sealed trait Hotswap[F[_], R] { def swap(next: Resource[F, R]): F[R] /** - * Acquires a shared lock to retrieve the current resource, if it exists. The returned - * resource is guaranteed to be available for its duration. The lock is released if the - * current resource does not exist. + * Gets the current resource, if it exists. The returned resource is guaranteed to be + * available for the duration of the returned resource. */ def get: Resource[F, Option[R]] diff --git a/tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala b/tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala index 3e451625e1..e78434489d 100644 --- a/tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala @@ -170,9 +170,8 @@ class HotswapSpec extends BaseSpec { outer => implicit ticker => val go = Hotswap.create[IO, Unit].use { hs => hs.get.useForever.start *> - hs.swap(IO.sleep(1.second).toResource) *> IO.sleep(2.seconds) *> - hs.get.use_.timeout(1.second).void + hs.swap(Resource.unit) } go must completeAs(()) } From dccd38098191c6a8f664ea46d9431b7bdc463ce9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 15:55:16 +0000 Subject: [PATCH 24/54] Implement `Fiber#join` via `asyncCheckAttempt` Co-authored-by: Sam Pillsworth --- .../src/main/scala/cats/effect/IOFiber.scala | 40 ++++++------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index f0c63cefeb..807a3a6b50 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -160,15 +160,21 @@ private final class IOFiber[A]( } /* this is swapped for an `IO.pure(outcome)` when we complete */ - private[this] var _join: IO[OutcomeIO[A]] = IO.async { cb => + private[this] var _join: IO[OutcomeIO[A]] = IO.asyncCheckAttempt { cb => IO { - val stack = registerListener(oc => cb(Right(oc))) + if (outcome == null) { + val back = callbacks.push(oc => cb(Right(oc))) - if (stack eq null) - Some(IO.unit) /* we were already invoked, so no `CallbackStack` needs to be managed */ - else { - val handle = stack.currentHandle() - Some(IO(stack.clearCurrent(handle))) + /* double-check */ + if (outcome != null) { + back.clearCurrent(back.currentHandle()) + Right(outcome) + } else { + val handle = back.currentHandle() + Left(Some(IO(back.clearCurrent(handle)))) + } + } else { + Right(outcome) } } } @@ -1168,26 +1174,6 @@ private final class IOFiber[A]( callbacks.unsafeSetCallback(cb) } - /* can return null, meaning that no CallbackStack needs to be later invalidated */ - private[this] def registerListener( - listener: OutcomeIO[A] => Unit): CallbackStack[OutcomeIO[A]] = { - if (outcome == null) { - val back = callbacks.push(listener) - - /* double-check */ - if (outcome != null) { - back.clearCurrent(back.currentHandle()) - listener(outcome) /* the implementation of async saves us from double-calls */ - null - } else { - back - } - } else { - listener(outcome) - null - } - } - @tailrec private[this] def succeeded(result: Any, depth: Int): IO[Any] = (ByteStack.pop(conts): @switch) match { From 64cf1b632f466abcb0be09869dca46cd8eac3cd3 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 16:00:02 +0000 Subject: [PATCH 25/54] checkpoint pack lock Co-authored-by: Sam Pillsworth Co-authored-by: Matthias Ernst --- .../scala/cats/effect/CallbackStack.scala | 16 +- .../scala/cats/effect/CallbackStack.scala | 183 ++++++++---------- .../main/scala/cats/effect/IODeferred.scala | 7 +- .../src/main/scala/cats/effect/IOFiber.scala | 9 +- 4 files changed, 98 insertions(+), 117 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/CallbackStack.scala b/core/js/src/main/scala/cats/effect/CallbackStack.scala index faf744cbfb..a42042462c 100644 --- a/core/js/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/js/src/main/scala/cats/effect/CallbackStack.scala @@ -18,14 +18,16 @@ package cats.effect import scala.scalajs.js +import CallbackStack.Handle + private trait CallbackStack[A] extends js.Object private final class CallbackStackOps[A](private val callbacks: js.Array[A => Unit]) extends AnyVal { - @inline def push(next: A => Unit): CallbackStack[A] = { + @inline def push(next: A => Unit): Handle[A] = { callbacks.push(next) - callbacks.asInstanceOf[CallbackStack[A]] + callbacks.length - 1 } @inline def unsafeSetCallback(cb: A => Unit): Unit = { @@ -36,25 +38,23 @@ private final class CallbackStackOps[A](private val callbacks: js.Array[A => Uni * Invokes *all* non-null callbacks in the queue, starting with the current one. Returns true * iff *any* callbacks were invoked. */ - @inline def apply(oc: A, invoked: Boolean): Boolean = + @inline def apply(oc: A): Boolean = callbacks .asInstanceOf[js.Dynamic] .reduceRight( // skips deleted indices, but there can still be nulls (acc: Boolean, cb: A => Unit) => if (cb ne null) { cb(oc); true } else acc, - invoked) + false) .asInstanceOf[Boolean] /** * Removes the current callback from the queue. */ - @inline def clearCurrent(handle: Int): Unit = + @inline def clearHandle(handle: Handle[A]): Unit = // deleting an index from a js.Array makes it sparse (aka "holey"), so no memory leak js.special.delete(callbacks, handle) - @inline def currentHandle(): CallbackStack.Handle = callbacks.length - 1 - @inline def clear(): Unit = callbacks.length = 0 // javascript is crazy! @@ -68,5 +68,5 @@ private object CallbackStack { @inline implicit def ops[A](stack: CallbackStack[A]): CallbackStackOps[A] = new CallbackStackOps(stack.asInstanceOf[js.Array[A => Unit]]) - type Handle = Int + type Handle[A] = Int } diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 404e8ffd36..1c268b66c8 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -21,6 +21,7 @@ import scala.annotation.tailrec import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference +import CallbackStack.Handle import CallbackStack.Node private final class CallbackStack[A](private[this] var callback: A => Unit) @@ -29,11 +30,15 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) private[this] val allowedToPack = new AtomicBoolean(true) - def push(cb: A => Unit): Node[A] = { + /** + * Pushes a callback to the top of the stack. Returns a handle that can be used with + * [[clearHandle]] to clear the callback. + */ + def push(cb: A => Unit): Handle[A] = { val newHead = new Node(cb) @tailrec - def loop(): CallbackStack[A] = { + def loop(): Handle[A] = { val currentHead = head.get() newHead.next = currentHead @@ -55,9 +60,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) * iff *any* callbacks were invoked. */ def apply(a: A): Boolean = { - while (!allowedToPack.compareAndSet(true, false)) { - // spinloop - } + // TODO should we read allowedToPack for memory effect? val cb = callback var invoked = if (cb != null) { @@ -79,82 +82,22 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) invoked } -} - -private object CallbackStack { - private[CallbackStack] final class Node[A]( - private[this] var callback: A => Unit, - ) { - var next: Node[A] = _ - - def getCallback(): A => Unit = callback - - def clear(): Unit = { - callback = null - } - } -} - -private final class CallbackStack[A](private[this] var callback: A => Unit) - extends AtomicReference[CallbackStack[A]] { - - val allowedToPack = new AtomicBoolean(true) - - def push(next: A => Unit): CallbackStack[A] = { - val attempt = new CallbackStack(next) - - @tailrec - def loop(): CallbackStack[A] = { - val cur = head.get() - attempt.lazySet(cur) - - if (!compareAndSet(cur, attempt)) - loop() - else - attempt - } - - loop() - } - - def unsafeSetCallback(cb: A => Unit): Unit = { - callback = cb - } /** - * Invokes *all* non-null callbacks in the queue, starting with the current one. Returns true - * iff *any* callbacks were invoked. + * Removes the callback referenced by a handle. */ - @tailrec - def apply(oc: A, invoked: Boolean): Boolean = { - val cb = callback - - val invoked2 = if (cb != null) { - cb(oc) - true - } else { - invoked - } - - val next = get() - if (next != null) - next(oc, invoked2) - else - invoked2 + def clearHandle(handle: CallbackStack.Handle[A]): Unit = { + handle.clear() } /** - * Removes the current callback from the queue. + * Nulls all references in this callback stack. */ - def clearCurrent(handle: CallbackStack.Handle): Unit = { - val _ = handle + def clear(): Unit = { callback = null + head.lazySet(null) } - def currentHandle(): CallbackStack.Handle = 0 - - def clear(): Unit = lazySet(null) - /** * It is intended that `bound` be tracked externally and incremented on each clear(). Whenever * pack is called, the number of empty cells removed from the stack is produced. It is @@ -185,11 +128,10 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) */ def pack(bound: Int): Int = if (allowedToPack.compareAndSet(true, false)) { - // the first cell is always retained - val got = get() + val got = head.get() val rtn = if (got ne null) - got.packInternal(bound, 0, this) + got.packHead(bound, this) else 0 allowedToPack.set(true) @@ -198,42 +140,83 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) 0 } - @tailrec - private def packInternal(bound: Int, removed: Int, parent: CallbackStack[A]): Int = { - if (callback == null) { - val child = get() +} + +private object CallbackStack { + def apply[A](callback: A => Unit): CallbackStack[A] = + new CallbackStack(callback) - // doing this cas here ultimately deoptimizes contiguous empty chunks - if (!parent.compareAndSet(this, child)) { - // if we're contending with another pack(), just bail and let them continue - removed + sealed abstract class Handle[A] { + private[CallbackStack] def clear(): Unit + } + + private[CallbackStack] final class Node[A]( + private[this] var callback: A => Unit + ) extends Handle[A] { + var next: Node[A] = _ + + def getCallback(): A => Unit = callback + + def clear(): Unit = { + callback = null + } + + @tailrec + def packHead(bound: Int, root: CallbackStack[A]): Int = { + val next = this.next // local copy + + if (callback == null) { + if (root.compareAndSet(this, next)) { + if (next == null) { + // bottomed out + 1 + } else { + // note this can cause the bound to go negative, which is fine + next.packTail(bound - 1, 1, this) + } + } else { // get the new top of the stack and start over + root.get().packHead(bound, root) + } } else { - if (child == null) { + if (next == null) { + // bottomed out + 0 + } else { + if (bound > 0) + next.packTail(bound - 1, 0, this) + else + 0 + } + } + } + + @tailrec + private def packTail(bound: Int, removed: Int, prev: Node[A]): Int = { + val next = this.next // local copy + + if (callback == null) { + // We own the pack lock, so it is safe to write `next`. It will be published to subsequent packs via the lock. + // Concurrent readers ie `CallbackStack#apply` may read a stale value for `next` still pointing to this node. + // This is okay b/c the new `next` (the tail) is still reachable via the old `next` (this node). + prev.next = next + if (next == null) { // bottomed out removed + 1 } else { // note this can cause the bound to go negative, which is fine - child.packInternal(bound - 1, removed + 1, parent) + next.packTail(bound - 1, removed + 1, prev) } - } - } else { - val child = get() - if (child == null) { - // bottomed out - removed } else { - if (bound > 0) - child.packInternal(bound - 1, removed, this) - else + if (next == null) { + // bottomed out removed + } else { + if (bound > 0) + next.packTail(bound - 1, removed, this) + else + removed + } } } } } - -private object CallbackStack { - def apply[A](cb: A => Unit): CallbackStack[A] = - new CallbackStack(cb) - - type Handle = Byte -} diff --git a/core/shared/src/main/scala/cats/effect/IODeferred.scala b/core/shared/src/main/scala/cats/effect/IODeferred.scala index 217af8360a..d586807ef0 100644 --- a/core/shared/src/main/scala/cats/effect/IODeferred.scala +++ b/core/shared/src/main/scala/cats/effect/IODeferred.scala @@ -23,11 +23,10 @@ private final class IODeferred[A] extends Deferred[IO, A] { private[this] val initial: IO[A] = { val await = IO.asyncCheckAttempt[A] { cb => IO { - val stack = callbacks.push(cb) - val handle = stack.currentHandle() + val handle = callbacks.push(cb) def clear(): Unit = { - stack.clearCurrent(handle) + callbacks.clearHandle(handle) val clearCount = clearCounter.incrementAndGet() if ((clearCount & (clearCount - 1)) == 0) // power of 2 clearCounter.addAndGet(-callbacks.pack(clearCount)) @@ -59,7 +58,7 @@ private final class IODeferred[A] extends Deferred[IO, A] { def complete(a: A): IO[Boolean] = IO { if (cell.compareAndSet(initial, IO.pure(a))) { - val _ = callbacks(Right(a), false) + val _ = callbacks(Right(a)) callbacks.clear() // avoid leaks true } else { diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index 807a3a6b50..465329dfee 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -163,15 +163,14 @@ private final class IOFiber[A]( private[this] var _join: IO[OutcomeIO[A]] = IO.asyncCheckAttempt { cb => IO { if (outcome == null) { - val back = callbacks.push(oc => cb(Right(oc))) + val handle = callbacks.push(oc => cb(Right(oc))) /* double-check */ if (outcome != null) { - back.clearCurrent(back.currentHandle()) + callbacks.clearHandle(handle) Right(outcome) } else { - val handle = back.currentHandle() - Left(Some(IO(back.clearCurrent(handle)))) + Left(Some(IO(callbacks.clearHandle(handle)))) } } else { Right(outcome) @@ -1061,7 +1060,7 @@ private final class IOFiber[A]( outcome = oc try { - if (!callbacks(oc, false) && runtime.config.reportUnhandledFiberErrors) { + if (!callbacks(oc) && runtime.config.reportUnhandledFiberErrors) { oc match { case Outcome.Errored(e) => currentCtx.reportFailure(e) case _ => () From 7f38b40710fa72214bc7b007a1ea2d75c3450934 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 16:34:16 +0000 Subject: [PATCH 26/54] Fix recursion when head is removed Co-authored-by: Sam Pillsworth --- .../scala/cats/effect/CallbackStack.scala | 10 ++++---- .../scala/cats/effect/CallbackStackSpec.scala | 25 ++++++++++--------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 1c268b66c8..b623e07bcb 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -131,7 +131,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) val got = head.get() val rtn = if (got ne null) - got.packHead(bound, this) + got.packHead(bound, 0, this) else 0 allowedToPack.set(true) @@ -162,20 +162,20 @@ private object CallbackStack { } @tailrec - def packHead(bound: Int, root: CallbackStack[A]): Int = { + def packHead(bound: Int, removed: Int, root: CallbackStack[A]): Int = { val next = this.next // local copy if (callback == null) { if (root.compareAndSet(this, next)) { if (next == null) { // bottomed out - 1 + removed + 1 } else { // note this can cause the bound to go negative, which is fine - next.packTail(bound - 1, 1, this) + root.get().packHead(bound - 1, removed + 1, root) } } else { // get the new top of the stack and start over - root.get().packHead(bound, root) + root.get().packHead(bound, removed, root) } } else { if (next == null) { diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index c5afd5fa10..1f64b3b5b0 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -16,14 +16,15 @@ package cats.effect +import cats.syntax.all._ + class CallbackStackSpec extends BaseSpec { "CallbackStack" should { "correctly report the number removed" in { val stack = CallbackStack[Unit](null) - val pushed = stack.push(_ => ()) - val handle = pushed.currentHandle() - pushed.clearCurrent(handle) + val handle = stack.push(_ => ()) + stack.clearHandle(handle) stack.pack(1) must beEqualTo(1) } @@ -31,22 +32,22 @@ class CallbackStackSpec extends BaseSpec { IO { val stack = CallbackStack[Unit](null) locally { - val pushed = stack.push(_ => ()) - val handle = pushed.currentHandle() - pushed.clearCurrent(handle) + val handle = stack.push(_ => ()) + stack.clearHandle(handle) } val clear = { - val pushed = stack.push(_ => ()) - val handle = pushed.currentHandle() - IO(pushed.clearCurrent(handle)) + val handle = stack.push(_ => ()) + IO(stack.clearHandle(handle)) } (stack, clear) }.flatMap { case (stack, clear) => val pack = IO(stack.pack(1)) - pack.both(clear *> pack).map { - case (x, y) => - (x + y) must beEqualTo(2) + (pack.both(clear *> pack), pack).mapN { + case ((x, y), z) => + if ((x + y + z) != 2) + println((x, y, z)) + (x + y + z) must beEqualTo(2) } }.replicateA_(1000) .as(ok) From 1d8224f5c9228564cd11aaf9ad3ea21bb431240d Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 16:48:09 +0000 Subject: [PATCH 27/54] If CASing head fails, pack tail anyway Co-authored-by: Sam Pillsworth --- .../src/main/scala/cats/effect/CallbackStack.scala | 6 +++--- .../src/test/scala/cats/effect/CallbackStackSpec.scala | 2 -- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index b623e07bcb..422892a7db 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -172,10 +172,10 @@ private object CallbackStack { removed + 1 } else { // note this can cause the bound to go negative, which is fine - root.get().packHead(bound - 1, removed + 1, root) + next.packHead(bound - 1, removed + 1, root) } - } else { // get the new top of the stack and start over - root.get().packHead(bound, removed, root) + } else { // we were unable to remove ourselves, but we can still pack our tail + packTail(bound - 1, removed, this) } } else { if (next == null) { diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index 1f64b3b5b0..ed7950ecbc 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -45,8 +45,6 @@ class CallbackStackSpec extends BaseSpec { val pack = IO(stack.pack(1)) (pack.both(clear *> pack), pack).mapN { case ((x, y), z) => - if ((x + y + z) != 2) - println((x, y, z)) (x + y + z) must beEqualTo(2) } }.replicateA_(1000) From ca8f1a32bea6072b1a5f3f3156314809ca741f8a Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 16:55:52 +0000 Subject: [PATCH 28/54] More aggressive self-removal on failed CAS Co-authored-by: Sam Pillsworth --- .../main/scala/cats/effect/CallbackStack.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 422892a7db..cf26ebeee7 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -161,6 +161,9 @@ private object CallbackStack { callback = null } + /** + * Packs this head node + */ @tailrec def packHead(bound: Int, removed: Int, root: CallbackStack[A]): Int = { val next = this.next // local copy @@ -174,8 +177,13 @@ private object CallbackStack { // note this can cause the bound to go negative, which is fine next.packHead(bound - 1, removed + 1, root) } - } else { // we were unable to remove ourselves, but we can still pack our tail - packTail(bound - 1, removed, this) + } else { + val prev = root.get() + if (prev.next eq this) { // prev is our new parent, we are its tail + this.packTail(bound, removed, prev) + } else { // we were unable to remove ourselves, but we can still pack our tail + next.packTail(bound - 1, removed, this) + } } } else { if (next == null) { @@ -190,6 +198,9 @@ private object CallbackStack { } } + /** + * Packs this non-head node + */ @tailrec private def packTail(bound: Int, removed: Int, prev: Node[A]): Int = { val next = this.next // local copy From 1e54754b1398b32d5b34136a739a46fc19a252b1 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 19:02:52 +0000 Subject: [PATCH 29/54] Fix NPE --- .../jvm-native/src/main/scala/cats/effect/CallbackStack.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index cf26ebeee7..bfedce7eb5 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -181,8 +181,10 @@ private object CallbackStack { val prev = root.get() if (prev.next eq this) { // prev is our new parent, we are its tail this.packTail(bound, removed, prev) - } else { // we were unable to remove ourselves, but we can still pack our tail + } else if (next != null) { // we were unable to remove ourselves, but we can still pack our tail next.packTail(bound - 1, removed, this) + } else { + removed } } } else { From ede463702cf5ad9e784ea64f2f1af70f820fbbc5 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 19:47:40 +0000 Subject: [PATCH 30/54] Fix spec on JS, new race condition bug on JVM --- .../scala/cats/effect/CallbackStack.scala | 9 ++-- .../scala/cats/effect/CallbackStack.scala | 10 ++++- .../main/scala/cats/effect/IODeferred.scala | 12 +++--- .../src/main/scala/cats/effect/IOFiber.scala | 2 +- .../scala/cats/effect/CallbackStackSpec.scala | 42 +++++++++---------- 5 files changed, 41 insertions(+), 34 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/CallbackStack.scala b/core/js/src/main/scala/cats/effect/CallbackStack.scala index a42042462c..51b3cde4ca 100644 --- a/core/js/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/js/src/main/scala/cats/effect/CallbackStack.scala @@ -49,16 +49,19 @@ private final class CallbackStackOps[A](private val callbacks: js.Array[A => Uni .asInstanceOf[Boolean] /** - * Removes the current callback from the queue. + * Removes the callback referenced by a handle. Returns `true` if the data structure was + * cleaned up immediately, `false` if a subsequent call to [[pack]] is required. */ - @inline def clearHandle(handle: Handle[A]): Unit = + @inline def clearHandle(handle: Handle[A]): Boolean = { // deleting an index from a js.Array makes it sparse (aka "holey"), so no memory leak js.special.delete(callbacks, handle) + true + } @inline def clear(): Unit = callbacks.length = 0 // javascript is crazy! - @inline def pack(bound: Int): Int = bound + @inline def pack(bound: Int): Int = 0 } private object CallbackStack { diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index bfedce7eb5..48d73ce291 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -84,10 +84,12 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) } /** - * Removes the callback referenced by a handle. + * Removes the callback referenced by a handle. Returns `true` if the data structure was + * cleaned up immediately, `false` if a subsequent call to [[pack]] is required. */ - def clearHandle(handle: CallbackStack.Handle[A]): Unit = { + def clearHandle(handle: CallbackStack.Handle[A]): Boolean = { handle.clear() + false } /** @@ -140,6 +142,8 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) 0 } + override def toString(): String = s"CallbackStack($callback, ${get()})" + } private object CallbackStack { @@ -231,5 +235,7 @@ private object CallbackStack { } } } + + override def toString(): String = s"Node($callback, $next)" } } diff --git a/core/shared/src/main/scala/cats/effect/IODeferred.scala b/core/shared/src/main/scala/cats/effect/IODeferred.scala index d586807ef0..33424e95bc 100644 --- a/core/shared/src/main/scala/cats/effect/IODeferred.scala +++ b/core/shared/src/main/scala/cats/effect/IODeferred.scala @@ -26,11 +26,13 @@ private final class IODeferred[A] extends Deferred[IO, A] { val handle = callbacks.push(cb) def clear(): Unit = { - callbacks.clearHandle(handle) - val clearCount = clearCounter.incrementAndGet() - if ((clearCount & (clearCount - 1)) == 0) // power of 2 - clearCounter.addAndGet(-callbacks.pack(clearCount)) - () + val removed = callbacks.clearHandle(handle) + if (!removed) { + val clearCount = clearCounter.incrementAndGet() + if ((clearCount & (clearCount - 1)) == 0) // power of 2 + clearCounter.addAndGet(-callbacks.pack(clearCount)) + () + } } val back = cell.get() diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index 465329dfee..c7036009dd 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -170,7 +170,7 @@ private final class IOFiber[A]( callbacks.clearHandle(handle) Right(outcome) } else { - Left(Some(IO(callbacks.clearHandle(handle)))) + Left(Some(IO { callbacks.clearHandle(handle); () })) } } else { Right(outcome) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index ed7950ecbc..e28007f3a1 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -16,39 +16,35 @@ package cats.effect -import cats.syntax.all._ - class CallbackStackSpec extends BaseSpec { "CallbackStack" should { "correctly report the number removed" in { val stack = CallbackStack[Unit](null) val handle = stack.push(_ => ()) - stack.clearHandle(handle) + stack.push(_ => ()) + stack.clearHandle(handle) must beFalse stack.pack(1) must beEqualTo(1) } "handle race conditions in pack" in real { - IO { - val stack = CallbackStack[Unit](null) - locally { - val handle = stack.push(_ => ()) - stack.clearHandle(handle) - } - val clear = { - val handle = stack.push(_ => ()) - IO(stack.clearHandle(handle)) - } - (stack, clear) - }.flatMap { - case (stack, clear) => - val pack = IO(stack.pack(1)) - (pack.both(clear *> pack), pack).mapN { - case ((x, y), z) => - (x + y + z) must beEqualTo(2) - } - }.replicateA_(1000) - .as(ok) + + IO(CallbackStack[Unit](null)).flatMap { stack => + val pushClearPack = for { + handle <- IO(stack.push(_ => ())) + removed <- IO(stack.clearHandle(handle)) + packed <- IO(stack.pack(1)) + } yield (if (removed) 1 else 0) + packed + + pushClearPack + .both(pushClearPack) + .productL(IO(stack.toString).flatMap(IO.println)) + .product(IO(stack.pack(1))) + .debug() + .flatMap { case ((x, y), z) => IO((x + y + z) must beEqualTo(2)) } + .replicateA_(1000) + .as(ok) + } } } From 55ba5c6fdb7b50bbfedd2fad512d22cf7cb05bc9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 20:02:50 +0000 Subject: [PATCH 31/54] Actually fix spec on JS --- tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index e28007f3a1..0affa5271e 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -23,7 +23,7 @@ class CallbackStackSpec extends BaseSpec { val stack = CallbackStack[Unit](null) val handle = stack.push(_ => ()) stack.push(_ => ()) - stack.clearHandle(handle) must beFalse + stack.clearHandle(handle) stack.pack(1) must beEqualTo(1) } From e970713ede93ca4ca825e737985611e7403026ed Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 21:00:52 +0000 Subject: [PATCH 32/54] Passthrough `removed` --- core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 48d73ce291..b86115c768 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -197,7 +197,7 @@ private object CallbackStack { 0 } else { if (bound > 0) - next.packTail(bound - 1, 0, this) + next.packTail(bound - 1, removed, this) else 0 } From d448086b574d63e4ec385339f16216db100fcd0f Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 21:06:16 +0000 Subject: [PATCH 33/54] Return `removed` more --- .../scala/cats/effect/CallbackStack.scala | 4 ++-- .../scala/cats/effect/CallbackStackSpec.scala | 21 ++++++++++++------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index b86115c768..844faff696 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -194,12 +194,12 @@ private object CallbackStack { } else { if (next == null) { // bottomed out - 0 + removed } else { if (bound > 0) next.packTail(bound - 1, removed, this) else - 0 + removed } } } diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index 0affa5271e..5de88b6875 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -30,18 +30,23 @@ class CallbackStackSpec extends BaseSpec { "handle race conditions in pack" in real { IO(CallbackStack[Unit](null)).flatMap { stack => - val pushClearPack = for { - handle <- IO(stack.push(_ => ())) + def pushClearPack(handle: CallbackStack.Handle[Unit]) = for { removed <- IO(stack.clearHandle(handle)) packed <- IO(stack.pack(1)) } yield (if (removed) 1 else 0) + packed - pushClearPack - .both(pushClearPack) - .productL(IO(stack.toString).flatMap(IO.println)) - .product(IO(stack.pack(1))) - .debug() - .flatMap { case ((x, y), z) => IO((x + y + z) must beEqualTo(2)) } + IO(stack.push(_ => ())) + .product(IO(stack.push(_ => ()))) + .flatMap { + case (handle1, handle2) => + // IO(stack.clearHandle(handle1)) *> + pushClearPack(handle1) + .both(pushClearPack(handle2)) + .productL(IO(stack.toString).flatMap(IO.println)) + .product(IO(stack.pack(1))) + .debug() + .flatMap { case ((x, y), z) => IO((x + y + z) must beEqualTo(2)) } + } .replicateA_(1000) .as(ok) } From 13115ff7e8bc486002f27d7e3a4a31859e3cbfcb Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 21:09:30 +0000 Subject: [PATCH 34/54] Tidy up the tests --- .../scala/cats/effect/CallbackStackSpec.scala | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index 5de88b6875..24b8a8532d 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -23,30 +23,26 @@ class CallbackStackSpec extends BaseSpec { val stack = CallbackStack[Unit](null) val handle = stack.push(_ => ()) stack.push(_ => ()) - stack.clearHandle(handle) - stack.pack(1) must beEqualTo(1) + val removed = stack.clearHandle(handle) + if (removed) + stack.pack(1) must beEqualTo(0) + else + stack.pack(1) must beEqualTo(1) } "handle race conditions in pack" in real { IO(CallbackStack[Unit](null)).flatMap { stack => - def pushClearPack(handle: CallbackStack.Handle[Unit]) = for { + val pushClearPack = for { + handle <- IO(stack.push(_ => ())) removed <- IO(stack.clearHandle(handle)) packed <- IO(stack.pack(1)) } yield (if (removed) 1 else 0) + packed - IO(stack.push(_ => ())) - .product(IO(stack.push(_ => ()))) - .flatMap { - case (handle1, handle2) => - // IO(stack.clearHandle(handle1)) *> - pushClearPack(handle1) - .both(pushClearPack(handle2)) - .productL(IO(stack.toString).flatMap(IO.println)) - .product(IO(stack.pack(1))) - .debug() - .flatMap { case ((x, y), z) => IO((x + y + z) must beEqualTo(2)) } - } + pushClearPack + .both(pushClearPack) + .product(IO(stack.pack(1))) + .flatMap { case ((x, y), z) => IO((x + y + z) must beEqualTo(2)) } .replicateA_(1000) .as(ok) } From 7f168982480ff6cbb63ae9795997717ab12923a1 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 21:12:34 +0000 Subject: [PATCH 35/54] Organize imports --- core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 844faff696..f56951f794 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -18,8 +18,7 @@ package cats.effect import scala.annotation.tailrec -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import CallbackStack.Handle import CallbackStack.Node From ea20d1d9a3bc77fa33be298908bfcee4981b9c4d Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 21:17:10 +0000 Subject: [PATCH 36/54] Make `next` a `private[this]` --- .../main/scala/cats/effect/CallbackStack.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index f56951f794..c9499121ea 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -39,7 +39,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) @tailrec def loop(): Handle[A] = { val currentHead = head.get() - newHead.next = currentHead + newHead.setNext(currentHead) if (!head.compareAndSet(currentHead, newHead)) loop() @@ -76,7 +76,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) cb(a) invoked = true } - currentNode = currentNode.next + currentNode = currentNode.getNext() } invoked @@ -156,10 +156,16 @@ private object CallbackStack { private[CallbackStack] final class Node[A]( private[this] var callback: A => Unit ) extends Handle[A] { - var next: Node[A] = _ + private[this] var next: Node[A] = _ def getCallback(): A => Unit = callback + def getNext(): Node[A] = next + + def setNext(next: Node[A]): Unit = { + this.next = next + } + def clear(): Unit = { callback = null } @@ -182,7 +188,7 @@ private object CallbackStack { } } else { val prev = root.get() - if (prev.next eq this) { // prev is our new parent, we are its tail + if (prev.getNext() eq this) { // prev is our new parent, we are its tail this.packTail(bound, removed, prev) } else if (next != null) { // we were unable to remove ourselves, but we can still pack our tail next.packTail(bound - 1, removed, this) @@ -213,8 +219,8 @@ private object CallbackStack { if (callback == null) { // We own the pack lock, so it is safe to write `next`. It will be published to subsequent packs via the lock. // Concurrent readers ie `CallbackStack#apply` may read a stale value for `next` still pointing to this node. - // This is okay b/c the new `next` (the tail) is still reachable via the old `next` (this node). - prev.next = next + // This is okay b/c the new `next` (this node's tail) is still reachable via the old `next` (this node). + prev.setNext(next) if (next == null) { // bottomed out removed + 1 From e9301e6c1d9fb484c54891282578eddcc52495a6 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 22:41:29 +0000 Subject: [PATCH 37/54] Fix NPE, add test --- .../src/main/scala/cats/effect/CallbackStack.scala | 3 ++- .../src/test/scala/cats/effect/CallbackStackSpec.scala | 9 +++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index c9499121ea..f28ebb1bcd 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -188,7 +188,8 @@ private object CallbackStack { } } else { val prev = root.get() - if (prev.getNext() eq this) { // prev is our new parent, we are its tail + if ((prev != null) && (prev.getNext() eq this)) { + // prev is our new parent, we are its tail this.packTail(bound, removed, prev) } else if (next != null) { // we were unable to remove ourselves, but we can still pack our tail next.packTail(bound - 1, removed, this) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index 24b8a8532d..54a8df1b99 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -47,6 +47,15 @@ class CallbackStackSpec extends BaseSpec { .as(ok) } } + + "pack runs concurrently with clear" in real { + IO { + val stack = CallbackStack[Unit](null) + val handle = stack.push(_ => ()) + stack.clearHandle(handle) + stack + }.flatMap(stack => IO(stack.pack(1)).both(IO(stack.clear()))).replicateA_(1000).as(ok) + } } } From 9c12425ab9f6de2c61926d93e633bd3ee6d84115 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 22:55:06 +0000 Subject: [PATCH 38/54] Hush MiMa --- build.sbt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 3b6d213387..0a36f1d147 100644 --- a/build.sbt +++ b/build.sbt @@ -658,7 +658,9 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) "cats.effect.unsafe.WorkerThread.sleep"), // #3787, internal utility that was no longer needed ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk"), - ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk$") + ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk$"), + // #3943, refactored internal private CallbackStack data structure + ProblemFilters.exclude[IncompatibleResultTypeProblem]("cats.effect.CallbackStack.push") ) ++ { if (tlIsScala3.value) { // Scala 3 specific exclusions From 9121e45de02e92ca6e60804cfa880266d613d2cc Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 22:59:45 +0000 Subject: [PATCH 39/54] Fix unused warning --- core/js/src/main/scala/cats/effect/CallbackStack.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/js/src/main/scala/cats/effect/CallbackStack.scala b/core/js/src/main/scala/cats/effect/CallbackStack.scala index 51b3cde4ca..4f74f1f859 100644 --- a/core/js/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/js/src/main/scala/cats/effect/CallbackStack.scala @@ -61,7 +61,10 @@ private final class CallbackStackOps[A](private val callbacks: js.Array[A => Uni @inline def clear(): Unit = callbacks.length = 0 // javascript is crazy! - @inline def pack(bound: Int): Int = 0 + @inline def pack(bound: Int): Int = { + val _ = bound + 0 + } } private object CallbackStack { From 4563927a9a2b975e42dba84a23b8ca137a9149ae Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 23:13:54 +0000 Subject: [PATCH 40/54] More hushing --- build.sbt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 0a36f1d147..d0f3bd561e 100644 --- a/build.sbt +++ b/build.sbt @@ -660,7 +660,9 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk"), ProblemFilters.exclude[MissingClassProblem]("cats.effect.Thunk$"), // #3943, refactored internal private CallbackStack data structure - ProblemFilters.exclude[IncompatibleResultTypeProblem]("cats.effect.CallbackStack.push") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("cats.effect.CallbackStack.push"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "cats.effect.CallbackStack.currentHandle") ) ++ { if (tlIsScala3.value) { // Scala 3 specific exclusions From 82c686f13674e56e34ab3a09f49e37387d3d0f7f Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 10 Jan 2024 23:39:50 +0000 Subject: [PATCH 41/54] Workaround weird unused warnings --- core/js/src/main/scala/cats/effect/CallbackStack.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/CallbackStack.scala b/core/js/src/main/scala/cats/effect/CallbackStack.scala index 4f74f1f859..b76eee490f 100644 --- a/core/js/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/js/src/main/scala/cats/effect/CallbackStack.scala @@ -61,10 +61,8 @@ private final class CallbackStackOps[A](private val callbacks: js.Array[A => Uni @inline def clear(): Unit = callbacks.length = 0 // javascript is crazy! - @inline def pack(bound: Int): Int = { - val _ = bound - 0 - } + @inline def pack(bound: Int): Int = + bound - bound // aka 0, but so bound is not unused ... } private object CallbackStack { From 9e3579b7827b9570196c75345b36f542611a2da8 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 11 Jan 2024 00:00:50 +0000 Subject: [PATCH 42/54] Even more hushing --- build.sbt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index d0f3bd561e..c7cd2a5cc2 100644 --- a/build.sbt +++ b/build.sbt @@ -819,7 +819,9 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) ProblemFilters.exclude[IncompatibleTemplateDefProblem]("cats.effect.CallbackStack"), // introduced by #3642, which optimized the BatchingMacrotaskExecutor ProblemFilters.exclude[MissingClassProblem]( - "cats.effect.unsafe.BatchingMacrotaskExecutor$executeBatchTaskRunnable$") + "cats.effect.unsafe.BatchingMacrotaskExecutor$executeBatchTaskRunnable$"), + // #3943, refactored internal private CallbackStack data structure + ProblemFilters.exclude[Problem]("cats.effect.CallbackStackOps.*") ) }, mimaBinaryIssueFilters ++= { From 35aa9a1ee6998783fcae9825ad7b14b080f82977 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 14 Jan 2024 00:22:13 +0000 Subject: [PATCH 43/54] Increase concurrency in `CallbackStackSpec` --- .../scala/cats/effect/CallbackStackSpec.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index 54a8df1b99..c65a6bf07e 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -16,7 +16,9 @@ package cats.effect -class CallbackStackSpec extends BaseSpec { +import cats.syntax.all._ + +class CallbackStackSpec extends BaseSpec with DetectPlatform { "CallbackStack" should { "correctly report the number removed" in { @@ -25,9 +27,9 @@ class CallbackStackSpec extends BaseSpec { stack.push(_ => ()) val removed = stack.clearHandle(handle) if (removed) - stack.pack(1) must beEqualTo(0) + stack.pack(1) mustEqual 0 else - stack.pack(1) must beEqualTo(1) + stack.pack(1) mustEqual 1 } "handle race conditions in pack" in real { @@ -40,10 +42,10 @@ class CallbackStackSpec extends BaseSpec { } yield (if (removed) 1 else 0) + packed pushClearPack - .both(pushClearPack) + .parReplicateA(3000) .product(IO(stack.pack(1))) - .flatMap { case ((x, y), z) => IO((x + y + z) must beEqualTo(2)) } - .replicateA_(1000) + .flatMap { case (xs, y) => IO((xs.sum + y) mustEqual 3000) } + .replicateA_(if (isJS || isNative) 1 else 1000) .as(ok) } } @@ -54,7 +56,7 @@ class CallbackStackSpec extends BaseSpec { val handle = stack.push(_ => ()) stack.clearHandle(handle) stack - }.flatMap(stack => IO(stack.pack(1)).both(IO(stack.clear()))).replicateA_(1000).as(ok) + }.flatMap(stack => IO(stack.pack(1)).both(IO(stack.clear()))).parReplicateA_(1000).as(ok) } } From a18138e359b12035388a5429ee0fda075bd6e3e5 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 14 Jan 2024 00:25:46 +0000 Subject: [PATCH 44/54] Add comment/ref about data race in `CallbackStack#apply` --- core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index f28ebb1bcd..d59b7e30a3 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -59,7 +59,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) * iff *any* callbacks were invoked. */ def apply(a: A): Boolean = { - // TODO should we read allowedToPack for memory effect? + // see also note about data races in Node#packTail val cb = callback var invoked = if (cb != null) { From 5b3ad16a78b1fbb01b9effd0bf2c89be2168555e Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 15 Jan 2024 00:37:51 +0000 Subject: [PATCH 45/54] Use `try`/`finally` to acquire/release pack lock --- .../src/main/scala/cats/effect/CallbackStack.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index d59b7e30a3..bdb01fd269 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -129,14 +129,15 @@ private final class CallbackStack[A](private[this] var callback: A => Unit) */ def pack(bound: Int): Int = if (allowedToPack.compareAndSet(true, false)) { - val got = head.get() - val rtn = + try { + val got = head.get() if (got ne null) got.packHead(bound, 0, this) else 0 - allowedToPack.set(true) - rtn + } finally { + allowedToPack.set(true) + } } else { 0 } From 108b6744ef3f10e392833c9a9eeb1c3c965a7ef8 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 15 Jan 2024 11:01:28 -0800 Subject: [PATCH 46/54] Tweak release notes config [ci skip] Co-authored-by: Sam Pillsworth --- .github/release.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/release.yml b/.github/release.yml index 2c49e97bcd..89613f317a 100644 --- a/.github/release.yml +++ b/.github/release.yml @@ -1,18 +1,18 @@ changelog: categories: - - title: Features + - title: Enhancements labels: - ':mushroom: enhancement' - title: Bug Fixes labels: - ':beetle: bug' + - title: Documentation + labels: + - ':books: docs' - title: Behind the Scenes labels: - ':gear: infrastructure' - ':robot:' - - title: Documentation - labels: - - ':books: docs' # Not for published notes, just to make sure we don't forget any accidentally unlabeled PRs - title: Uncategorized labels: From 3e9f22e1202cfa2b0d79e6de3bf3f0979ca6e82a Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 15 Jan 2024 19:53:40 +0000 Subject: [PATCH 47/54] Decrease concurrency/replicas in `CallbackStackSpec` Co-authored-by: Sam Pillsworth --- .../src/test/scala/cats/effect/CallbackStackSpec.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index c65a6bf07e..fdf5a5ab41 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -41,10 +41,12 @@ class CallbackStackSpec extends BaseSpec with DetectPlatform { packed <- IO(stack.pack(1)) } yield (if (removed) 1 else 0) + packed + val concurrency = Math.max(2, Runtime.getRuntime().availableProcessors()) + pushClearPack - .parReplicateA(3000) + .parReplicateA(concurrency) .product(IO(stack.pack(1))) - .flatMap { case (xs, y) => IO((xs.sum + y) mustEqual 3000) } + .flatMap { case (xs, y) => IO((xs.sum + y) mustEqual concurrency) } .replicateA_(if (isJS || isNative) 1 else 1000) .as(ok) } From 976ac3fd3d06017889e186132423a70bf80d582e Mon Sep 17 00:00:00 2001 From: Sam Pillsworth Date: Mon, 15 Jan 2024 15:05:49 -0500 Subject: [PATCH 48/54] Update release.yml --- .github/release.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/release.yml b/.github/release.yml index 89613f317a..1ce01cf9ec 100644 --- a/.github/release.yml +++ b/.github/release.yml @@ -13,6 +13,7 @@ changelog: labels: - ':gear: infrastructure' - ':robot:' + - ':art: backstage' # Not for published notes, just to make sure we don't forget any accidentally unlabeled PRs - title: Uncategorized labels: From e18e6d07a875c76f6278c5384b5fccb664c1903b Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 15 Jan 2024 13:16:36 -0800 Subject: [PATCH 49/54] Document release process Co-authored-by: Sam Pillsworth --- RELEASING.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 RELEASING.md diff --git a/RELEASING.md b/RELEASING.md new file mode 100644 index 0000000000..dc61fab879 --- /dev/null +++ b/RELEASING.md @@ -0,0 +1,27 @@ +# Release procedure + +1. Draft a new release. +1. Create a new tag with the appropriate name (e.g. v3.5.1); also name the release accordingly. +1. Make sure you're targeting the appropriate branch! (probably the minor branch) +1. "Generate new release notes" +1. edit edit edit + - Increment and transliterate the release counter + - Cargo-cult the old release notes header from the last minor release (including any warnings) + - Add any special notes to the release header (any major changes or fixes that need calling out) + - Fix up any formatting or PRs that got sorted to the wrong category + - Double-check PR attributions (collaborations, hand-offs, etc.) + - Just make it look nice :) +1. Publish the release. +1. Wait for all the CI madness to happen, for the release to announced to Discord, and for the artifacts to sync to Maven Central. +1. Make sure you're locally updated and on the right major/minor branch (this is the same branch as step 3). +1. Open a PR to merge the minor branch into the major branch. This is only necessary for patch releases. + + `scripts/make-release-prs.sh ` + + e.g. `scripts/make-release-prs.sh v3.5.1 v3.5.2` + +1. Open a PR to update the version in the README and documentation site. This is only necessary for stable releases (i.e., not Milestones or Release Candidates) + + `scripts/make-site-pr.sh ` + + e.g. `scripts/make-site-pr.sh v3.5.1 v3.5.2` From 02a0390fa0ee472f4f051047d81532df1e8b72f5 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 15 Jan 2024 16:05:59 -0800 Subject: [PATCH 50/54] Update versions for 3.5.3 --- README.md | 14 +++++++------- docs/core/native-image.md | 2 +- docs/core/scala-native.md | 2 +- docs/core/test-runtime.md | 2 +- docs/faq.md | 2 +- docs/getting-started.md | 4 ++-- docs/migration-guide.md | 8 ++++---- docs/std/mapref.md | 2 +- docs/std/ref.md | 2 +- docs/tutorial.md | 4 ++-- 10 files changed, 21 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 785f456725..bfe0f4b4f5 100644 --- a/README.md +++ b/README.md @@ -11,11 +11,11 @@ ## Getting Started -- Wired: **3.5.2** +- Wired: **3.5.3** - Tired: **2.5.5** (end of life) ```scala -libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.2" +libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.3" ``` The above represents the core, stable dependency which brings in the entirety of Cats Effect. This is *most likely* what you want. All current Cats Effect releases are published for Scala 2.12, 2.13, 3.0, and Scala.js 1.7. @@ -30,22 +30,22 @@ Depending on your use-case, you may want to consider one of the several other mo ```scala libraryDependencies ++= Seq( - "org.typelevel" %% "cats-effect-kernel" % "3.5.2", - "org.typelevel" %% "cats-effect-laws" % "3.5.2" % Test) + "org.typelevel" %% "cats-effect-kernel" % "3.5.3", + "org.typelevel" %% "cats-effect-laws" % "3.5.3" % Test) ``` If you're a middleware framework (like [Fs2](https://fs2.io/)), you probably want to depend on **std**, which gives you access to `Queue`, `Semaphore`, and much more without introducing a hard-dependency on `IO` outside of your tests: ```scala libraryDependencies ++= Seq( - "org.typelevel" %% "cats-effect-std" % "3.5.2", - "org.typelevel" %% "cats-effect" % "3.5.2" % Test) + "org.typelevel" %% "cats-effect-std" % "3.5.3", + "org.typelevel" %% "cats-effect" % "3.5.3" % Test) ``` You may also find some utility in the **testkit** and **kernel-testkit** projects, which contain `TestContext`, generators for `IO`, and a few other things: ```scala -libraryDependencies += "org.typelevel" %% "cats-effect-testkit" % "3.5.2" % Test +libraryDependencies += "org.typelevel" %% "cats-effect-testkit" % "3.5.3" % Test ``` Cats Effect provides backward binary compatibility within the 2.x and 3.x version lines, and both forward and backward compatibility within any major/minor line. This is analogous to the versioning scheme used by Cats itself, as well as other major projects such as Scala.js. Thus, any project depending upon Cats Effect 2.2.1 can be used with libraries compiled against Cats Effect 2.0.0 or 2.2.3, but *not* with libraries compiled against 2.3.0 or higher. diff --git a/docs/core/native-image.md b/docs/core/native-image.md index 0fefa4e45d..3a29e30c9a 100644 --- a/docs/core/native-image.md +++ b/docs/core/native-image.md @@ -33,7 +33,7 @@ ThisBuild / scalaVersion := "2.13.8" lazy val root = (project in file(".")).enablePlugins(NativeImagePlugin).settings( name := "cats-effect-3-hello-world", - libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.2", + libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.3", Compile / mainClass := Some("com.example.Main"), nativeImageOptions += "--no-fallback", nativeImageVersion := "22.1.0" // It should be at least version 21.0.0 diff --git a/docs/core/scala-native.md b/docs/core/scala-native.md index 7bceac890a..caaa355101 100644 --- a/docs/core/scala-native.md +++ b/docs/core/scala-native.md @@ -22,7 +22,7 @@ lazy val root = project.in(file(".")) .enablePlugins(ScalaNativePlugin) .settings( name := "cats-effect-3-hello-world", - libraryDependencies += "org.typelevel" %%% "cats-effect" % "3.5.2", + libraryDependencies += "org.typelevel" %%% "cats-effect" % "3.5.3", Compile / mainClass := Some("com.example.Main") ) diff --git a/docs/core/test-runtime.md b/docs/core/test-runtime.md index a6c8746e4c..cb5adfb29d 100644 --- a/docs/core/test-runtime.md +++ b/docs/core/test-runtime.md @@ -28,7 +28,7 @@ For those migrating code from Cats Effect 2, `TestControl` is a considerably mor In order to use `TestControl`, you will need to bring in the **cats-effect-testkit** dependency: ```scala -libraryDependencies += "org.typelevel" %% "cats-effect-testkit" % "3.5.2" % Test +libraryDependencies += "org.typelevel" %% "cats-effect-testkit" % "3.5.3" % Test ``` ## Example diff --git a/docs/faq.md b/docs/faq.md index 2720afe6ab..4a7c1b40e6 100644 --- a/docs/faq.md +++ b/docs/faq.md @@ -9,7 +9,7 @@ title: FAQ ```scala-cli //> using scala "2.13.8" -//> using lib "org.typelevel::cats-effect::3.5.2" +//> using lib "org.typelevel::cats-effect::3.5.3" import cats.effect._ diff --git a/docs/getting-started.md b/docs/getting-started.md index eff54fead0..4241e338dd 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -6,7 +6,7 @@ title: Getting Started Add the following to your **build.sbt**: ```scala -libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.2" +libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.3" ``` Naturally, if you're using ScalaJS, you should replace the double `%%` with a triple `%%%`. If you're on Scala 2, it is *highly* recommended that you enable the [better-monadic-for](https://github.com/oleg-py/better-monadic-for) plugin, which fixes a number of surprising elements of the `for`-comprehension syntax in the Scala language: @@ -62,7 +62,7 @@ We will learn more about constructs like `start` and `*>` in later pages, but fo Of course, the easiest way to play with Cats Effect is to try it out in a Scala REPL. We recommend using [Ammonite](https://ammonite.io/#Ammonite-REPL) for this kind of thing. To get started, run the following lines (if not using Ammonite, skip the first line and make sure that Cats Effect and its dependencies are correctly configured on the classpath): ```scala -import $ivy.`org.typelevel::cats-effect:3.5.2` +import $ivy.`org.typelevel::cats-effect:3.5.3` import cats.effect.unsafe.implicits._ import cats.effect.IO diff --git a/docs/migration-guide.md b/docs/migration-guide.md index 33eae63039..e4d19a705c 100644 --- a/docs/migration-guide.md +++ b/docs/migration-guide.md @@ -81,9 +81,9 @@ Cats Effect 3 splits the code dependency into multiple modules. If you were prev The current non-test modules are: ```scala -"org.typelevel" %% "cats-effect-kernel" % "3.5.2", -"org.typelevel" %% "cats-effect-std" % "3.5.2", -"org.typelevel" %% "cats-effect" % "3.5.2", +"org.typelevel" %% "cats-effect-kernel" % "3.5.3", +"org.typelevel" %% "cats-effect-std" % "3.5.3", +"org.typelevel" %% "cats-effect" % "3.5.3", ``` - `kernel` - type class definitions, simple concurrency primitives @@ -96,7 +96,7 @@ The current non-test modules are: libraryDependencies ++= Seq( //... - "org.typelevel" %% "cats-effect" % "2.4.0", -+ "org.typelevel" %% "cats-effect" % "3.5.2", ++ "org.typelevel" %% "cats-effect" % "3.5.3", //... ) ``` diff --git a/docs/std/mapref.md b/docs/std/mapref.md index 54535d2411..cff4da3eb8 100644 --- a/docs/std/mapref.md +++ b/docs/std/mapref.md @@ -32,7 +32,7 @@ as long as their keys belong to different shards. This is probably one of the most common uses of this datatype. ```scala mdoc:reset:silent -//> using lib "org.typelevel::cats-effect::3.5.2" +//> using lib "org.typelevel::cats-effect::3.5.3" import cats.effect.IO import cats.effect.std.MapRef diff --git a/docs/std/ref.md b/docs/std/ref.md index b4c7cc5892..35f3c7c62a 100644 --- a/docs/std/ref.md +++ b/docs/std/ref.md @@ -33,7 +33,7 @@ This is probably one of the most common uses of this concurrency primitive. In this example, the workers will concurrently run and update the value of the `Ref`. ```scala mdoc:reset:silent -//> using lib "org.typelevel::cats-effect::3.5.2" +//> using lib "org.typelevel::cats-effect::3.5.3" import cats.effect.{IO, IOApp, Sync} import cats.effect.kernel.Ref diff --git a/docs/tutorial.md b/docs/tutorial.md index ffdf9c3e87..471de56946 100644 --- a/docs/tutorial.md +++ b/docs/tutorial.md @@ -42,11 +42,11 @@ running the code snippets in this tutorial, it is recommended to use the same ```scala name := "cats-effect-tutorial" -version := "3.5.2" +version := "3.5.3" scalaVersion := "2.13.6" -libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.2" withSources() withJavadoc() +libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.3" withSources() withJavadoc() scalacOptions ++= Seq( "-feature", From 07d0b9cc26f054e7f0a8bd1fa8fc4f295dc5b736 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 16 Jan 2024 02:15:43 +0000 Subject: [PATCH 51/54] Fix compile --- .../main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index e62d1d6147..aad24a5479 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -527,7 +527,7 @@ private[effect] final class WorkStealingThreadPool[P]( */ private[effect] def prepareForBlocking(): Unit = { val thread = Thread.currentThread() - val worker = thread.asInstanceOf[WorkerThread] + val worker = thread.asInstanceOf[WorkerThread[_]] worker.prepareForBlocking() } From 3b5325d2df0cb856cdad09e94b95d75351093912 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 16 Jan 2024 02:21:18 +0000 Subject: [PATCH 52/54] Fix compile --- .../src/test/scala/cats/effect/CallbackStackSpec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala index fdf5a5ab41..151d5bf3d0 100644 --- a/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala @@ -22,7 +22,7 @@ class CallbackStackSpec extends BaseSpec with DetectPlatform { "CallbackStack" should { "correctly report the number removed" in { - val stack = CallbackStack[Unit](null) + val stack = CallbackStack.of[Unit](null) val handle = stack.push(_ => ()) stack.push(_ => ()) val removed = stack.clearHandle(handle) @@ -34,7 +34,7 @@ class CallbackStackSpec extends BaseSpec with DetectPlatform { "handle race conditions in pack" in real { - IO(CallbackStack[Unit](null)).flatMap { stack => + IO(CallbackStack.of[Unit](null)).flatMap { stack => val pushClearPack = for { handle <- IO(stack.push(_ => ())) removed <- IO(stack.clearHandle(handle)) @@ -54,7 +54,7 @@ class CallbackStackSpec extends BaseSpec with DetectPlatform { "pack runs concurrently with clear" in real { IO { - val stack = CallbackStack[Unit](null) + val stack = CallbackStack.of[Unit](null) val handle = stack.push(_ => ()) stack.clearHandle(handle) stack From 311d406d9adc7143206533ca88d9242a99f47938 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 16 Jan 2024 02:33:32 +0000 Subject: [PATCH 53/54] Organize imports --- core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala index 7043b5de4a..37c10ced94 100644 --- a/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala +++ b/core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import CallbackStack.Handle import CallbackStack.Node - import Platform.static private final class CallbackStack[A](private[this] var callback: A => Unit) From a608390ea642b7ba114bc5bc8a18e56c2be884c4 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 16 Jan 2024 02:46:10 +0000 Subject: [PATCH 54/54] Fix warning --- core/shared/src/main/scala/cats/effect/IODeferred.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IODeferred.scala b/core/shared/src/main/scala/cats/effect/IODeferred.scala index 13c05d2226..a64481b910 100644 --- a/core/shared/src/main/scala/cats/effect/IODeferred.scala +++ b/core/shared/src/main/scala/cats/effect/IODeferred.scala @@ -29,9 +29,10 @@ private final class IODeferred[A] extends Deferred[IO, A] { val removed = callbacks.clearHandle(handle) if (!removed) { val clearCount = clearCounter.incrementAndGet() - if ((clearCount & (clearCount - 1)) == 0) // power of 2 + if ((clearCount & (clearCount - 1)) == 0) { // power of 2 clearCounter.addAndGet(-callbacks.pack(clearCount)) - () + () + } } }