Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated & revised Future and Promise implementation #6610

Merged
merged 1 commit into from
Aug 18, 2018

Conversation

viktorklang
Copy link
Contributor

@viktorklang viktorklang commented May 9, 2018

Introduces,
Future.delegate[A](=> Future[A])(implicit ec: ExecutionContext): Future[A]

  • this allows for construction of Futures which is delegated
    onto the supplied ExecutionContext
  • Equivalent to: Future.unit.flatMap(_ => expr)

Makes Futures correctly handle RejectedExecutionExceptions,
from ExecutionContext.execute.

Makes Futures correctly handle Thread.interrupt()'s.

Drastically improves performance for most, if not all, combinators.

Introduces a different implementation of Linking, with dedicated link
nodes—which are GC:ed upon completion.

Introduces a different implementation of the transformations, instead of
relying primarily on transformWith or onComplete, the DefaultPromise
specializes practically all combinators.

Introduces a different implementation of the callback management,
making concats O(1) at the expense of stack space at completion.

@scala-jenkins scala-jenkins added this to the 2.13.0-M5 milestone May 9, 2018
@viktorklang
Copy link
Contributor Author

@viktorklang
Copy link
Contributor Author

Ping @bantonsson @H3NK3

link(target)
} else if (state.isInstanceOf[Link[T]]) {
// TODO: does relinking ever make sense?
state.asInstanceOf[Link[T]].relink(link = state.asInstanceOf[Link[T]], target = target, owner = this)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richdougherty Rich, I'd love to get your input on this. Does re-linking ever make sense really? Callbacks have already been migrated, so re-pointing it to somewhere else seems like a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@richdougherty richdougherty May 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pasted the old comments below. Is this new relinking code the same as the old code which breaks/compresses long chains of promises and gets them all to point at a root promise?

-   * The problem of leaks is solved by automatically breaking these chains of
-   * promises, so that promises don't refer to each other in a long chain. This
-   * allows each promise to be individually collected. The idea is to "flatten"
-   * the chain of promises, so that instead of each promise pointing to its
-   * neighbour, they instead point directly the promise at the root of the
-   * chain. This means that only the root promise is referenced, and all the
-   * other promises are available for garbage collection as soon as they're no
-   * longer referenced by user code. 
...
-   * To mitigate the problem of the root promise changing, whenever a promise's
-   * methods are called, and it needs a reference to its root promise it calls
-   * the `compressedRoot()` method. This method re-scans the promise chain to
-   * get the root promise, and also compresses its links so that it links
-   * directly to whatever the current root promise is. This ensures that the
-   * chain is flattened whenever `compressedRoot()` is called. And since
-   * `compressedRoot()` is called at every possible opportunity (when getting a
-   * promise's value, when adding an onComplete handler, etc), this will happen
-   * frequently. Unfortunately, even this eager relinking doesn't absolutely
-   * guarantee that the chain will be flattened and that leaks cannot occur.
-   * However eager relinking does greatly reduce the chance that leaks will
-   * occur.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richdougherty It's not the same, but it is based on that code. The new implementation correctly unlinks in case of completion, to not leave links post completion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richdougherty But still: Does switching one link to another really make sense? Callbacks have already been migrated to another promise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which cases would links remain chained together?

If there's a chance of building a long (possibly infinite) chain of links then that might be worth addressing.

Does the length of the chain affect the performance of adding a callback to the end of the chain? I.e. does the callback logic ever have to traverse a long chain? If so, it would be quicker to keep the chain short.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richdougherty Perhaps we're talking past each other. I'm talking about the case where a Future is linked to one logical chain and then gets linked to another logical chain. I.e. linkRootOf being invoked twice on the same DefaultPromise, but for different logical chains.

My question is: should it only be possible to call linkRootOf once for a DefaultPromise?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original code linkRootOf could be invoked twice.

// from old code
def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] = {
  val p = new DefaultPromise[S]()
  onComplete { 
    ...
    case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p)
    ...
  }
  p.future
}

// example of linking twice
val rootPromise = Promise()
val root = rootPromise.future // Actually a DefaultPromise

fut1.transformWith(_ => root) // root.linkRootOf(...) call #1
fut2.transformWith(_ => root) // root.linkRootOf(...) call #2

@@ -39,79 +40,76 @@ import scala.annotation.tailrec
* WARNING: The underlying Executor's execute-method must not execute the submitted Runnable
* in the calling thread synchronously. It must enqueue/handoff the Runnable.
*/
private[concurrent] trait BatchingExecutor extends Executor {
trait BatchingExecutor extends Executor {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% decided on making this public, thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the potential downsides?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usage?:)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd keep it private since the implementation involves mutable state and people might try to extend it without understanding the implications. Advanced users can extend from Executor directly and implement something similar tailored to their needs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fwbrasil True. Moved it back to private.

case t: Throwable =>
parentBlockContext = null // Need to reset this before re-submitting it
_tasksLocal.remove() // If unbatchedExecute runs synchronously
unbatchedExecute(this) //TODO what if this submission fails?
Copy link
Contributor

@NthPortal NthPortal May 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could potentially do

try unbatchedExecute(this)
catch {
  case inner: Throwable if NonFatal(t) && !NonFatal(inner) =>
    inner.addSuppressed(t)
    throw inner
  case inner: Throwable => t.addSuppressed(inner)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that doesn't play well with cached exception instances though. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few possibilities I've come up with:

  • if you're not concerned about unbatchedExecute throwing a cached exception instance, you could always addSuppressed to that one, even if it would in general make more sense to rethrow the original one
  • construct a new exception with one as the cause and the other suppressed. I'm not super happy with this, but it certainly solves the cached exception problem

Copy link
Contributor Author

@viktorklang viktorklang May 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal How about this:

    private[this] final def handleRunFailure(cause: Throwable): Unit =
      if (NonFatal(cause) || cause.isInstanceOf[InterruptedException]) {
        try unbatchedExecute(this) catch {
          case inner: Throwable =>
            if (NonFatal(inner)) {
              val e = new ExecutionException("Non-fatal error occurred and resubmission failed, see suppressed exception.", cause)
              e.addSuppressed(inner)
              throw e
            } else throw inner // inner exception is fatal, throw it
        }
      } else throw cause // throw fatals

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTOH it needs some extra consideration w.r.t. Interruptions. Thinking…

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this will work @NthPortal:

    private[this] final def handleRunFailure(cause: Throwable): Nothing =
      if (NonFatal(cause) || cause.isInstanceOf[InterruptedException]) {
        try unbatchedExecute(this) catch {
          case inner: Throwable =>
            if (NonFatal(inner)) {
              val e = new ExecutionException("Non-fatal error occurred and resubmission failed, see suppressed exception.", cause)
              e.addSuppressed(inner)
              throw e
            } else throw inner
        }
        throw cause
      } else throw cause

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rationale: original exception will be rethrown if it is fatal or if unbatchedExecute is successful.
If unbatchedExecute throws a Fatal (includes InterruptedException) then just rethrow that, but if not, then both the original exception (cause) and the inner exception is wrappen in a new exception and thrown

case Failure(t) => Try(throw f(t)) // will throw fatal errors!
t =>
if (t.isInstanceOf[Success[T]]) t map s
else throw f(t.asInstanceOf[Failure[T]].exception) // will throw fatal errors!
Copy link
Contributor

@NthPortal NthPortal May 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit: I am imagining that all these asInstanceOfs and isInstanceOfs are going to cause unchecked warnings - will probably need to import scala.{unchecked => uc} and throw a bunch of @ucs around.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why is isInstanceOf used? wouldn't it be cheaper to do a check like

if t.isSuccess() => t map s
else throw f (t.asInstanceOf[Failure[T]].exception)

unless of course the isSuccess method has been removed in 2.13

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@javax-swing @NthPortal The answer is performance :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(good to comment these little performance hacks so someone doesn't innocently regress them later)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is that isInstanceOf translates to a single instanceof bytecode, while isSuccess is a method call which (a) allocates a stack frame and (b) has more than a single bytecode of body. Someone with more knowledge can correct me though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viktorklang If you want, I can add the @unchecked annotations this weekend, and either force push or push a commit labeled "[squash]" to your branch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal AFAIK there are no unchecked warnings, can you confirm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SethTisue Basically the entire file is a big performance "hack". I can add a disclaimer in the beginning of the file?

Copy link
Contributor

@NthPortal NthPortal May 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viktorklang you're right; I was mistaken. I thought isInstanceOf[Link[T]] would warn because T is erased, but apparently not.

final def defer[T](body: => Future[T])(implicit executor: ExecutionContext): Future[T] =
unit.flatMap(_ => body)

/** Simple version of `Future.traverse`. Asynchronously and non-blockingly transforms, in essence, a `IterableOnce[Future[A]]`
Copy link
Contributor

@NthPortal NthPortal May 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "in essence"?

Also, should be "an `IterableOnce[Future[A]]`", I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed a way of saying "a subtype of ItrableOnce to the same subtype of IterableOnce". Suggest alternative?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been thinking about this, and have not yet come up with an alternative (other than saying exactly what you said: "a subtype of ..."). I will continue to think on it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal Any suggestion? :)

Copy link
Contributor

@nafg nafg Aug 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line really predates this change. That said, personally I would go for something less technical, like "transforms a collection of futures into a single future of a collection of the results of all the original futures (using the same collection type)"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nafg How about:

transforms a collection of futures into a single future of a collection (with the same type as the original) of the results of all the futures.`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine by me

override def apply(v1: Try[T]): Unit = {
val r = getAndSet(null)
if (r ne null)
r tryComplete v1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of the AtomicReference is so that it doesn't try to complete the Promise repeatedly.
If so, why is this r tryComplete v1 and not r complete v1?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, seems like there are conflicting ideas here. if using tryComplete then the code could be simplified to.

futures.iterator().foreach { _.onComplete(p.tryComplete) }

since promises are atomic refs anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the AtomicReference is not new—see History for background.
tryComplete is used because it is cheaper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha. An inline comment "// cheaper than `complete`" might be helpful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal complete is tryComplete + conditional throw

throw new IllegalStateException("problem in scala.concurrent internal callback", t)
private[concurrent] final object InternalCallbackExecutor extends ExecutionContext with java.util.concurrent.Executor with BatchingExecutor {
override protected final def unbatchedExecute(r: Runnable): Unit = r.run()
override final def reportFailure(t: Throwable): Unit = ExecutionContext.defaultReporter(t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for no longer throwing the IllegalStateException? Are there internal callbacks which may throw exceptions now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation for not throwing is that it could lead to broken code (callbacks not being executed). And I didn't want to throw a Fatal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to report the exception wrapped in an IllegalStateException, since this isn't supposed to happen? e.g.

override final def reportFailure(t: Throwable): Unit =
  ExecutionContext.defaultReporter(new IllegalStateException("problem in scala.concurrent internal callback", t)

either that or report both

override final def reportFailure(t: Throwable): Unit = {
  ExecutionContext.defaultReporter(new IllegalStateException("problem in scala.concurrent internal callback"/*, t*/)
  ExecutionContext.defaultReporter(t)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal Great idea!

*
* @return This promise
*/
@deprecated("Since this method is semantically equivalent to `completeWith`, use that instead.", "2.13")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably be since "2.13.0"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

* @return This promise
*/
@deprecated("Since this method is semantically equivalent to `completeWith`, use that instead.", "2.13")
final def tryCompleteWith(other: Future[T]): this.type = completeWith(other)
Copy link
Contributor

@NthPortal NthPortal May 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious as to why completeWith get chosen as the name to keep, and not tryCompleteWith

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal Great question! My rationale was that completeWith is by necessity not always immediate, hence completeWith becomes an attempt by its very nature. I'm open to being convinced otherwise! :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it is right now, there are two pairs of methods: (tryC|c)omplete and (tryC|c)ompleteWith; my first intuition is that the methods of each pair should behave "the same", so to speak.

For the first pair, tryComplete does nothing if it fails, while complete throws an exception. I would expect then that tryCompleteWith does nothing when it fails (which is correct), and that completeWith throws an exception (which is not correct). Obviously, there is no way to have completeWith throw an exception, because it fails on a different thread, but its behaviour is still less immediately intuitive to me. I feel that tryCompleteWith better indicates what it does, and mirrors tryComplete more.

I would love to get broader feedback on what others feel is the more intuitive name to them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal Sounds good. Let's see if there's more input from others here.

final override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables)
final override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit)
final override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables)
final override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit)
Copy link
Contributor

@NthPortal NthPortal May 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does final add if it's an anonymous class and can't be extended?

Edit: I'm not saying it doesn't do anything, just that I don't know what that is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal In case it is moved out, I want(ed) to make sure that it stays final.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough 👍

* @param executor the execution context on which the `body` is evaluated in
* @return the `Future` holding the result of the computation
*/
final def defer[T](body: => Future[T])(implicit executor: ExecutionContext): Future[T] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a use case that motivates this method? Is it for cases in which body is expensive to execute? If so

  1. I think the scaladoc should have a note about that
  2. I'm concerned about encouraging such behaviour. If body takes a significant amount of time to execute, it should be in a blocking block, and probably not on the global ExecutionContext; however, neither of those things are indicated to be necessary by the name defer

To me, defer implies that we're waiting to create the Future result until later. However

  • if the ExecutionContext's queue is empty, it will effectively happen immediately
  • if body uses the same ExecutionContext as defer, it is just throwing it on the ExecutionContext's queue twice, which seems... weird and not useful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal The method defers execution to the given ExecutionContext. It is less about expensive, and more about controlling where things get executed. No matter the method, if there is blocking, then wrapping in blocking{} should be encouraged.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer doesn't seem a good name since people might think that'll postpone the execution. Maybe fork or flatten?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fwbrasil flatApply? :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like delegate

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "defer" is too misleading, not that it's technically wrong, but people will misunderstand it anyway.

What is a concrete example use case? And why isn't Future { makeFuture() }.flatten good enough?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, will it be renamed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nafg it is performance-wise more along the lines of Future.unit.flatMap(_ => expr) but that might be hard for people to find out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anyone object to delegate, which @viktorklang suggested a couple months ago?

Copy link
Contributor Author

@viktorklang viktorklang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case Failure(t) => Try(throw f(t)) // will throw fatal errors!
t =>
if (t.isInstanceOf[Success[T]]) t map s
else throw f(t.asInstanceOf[Failure[T]].exception) // will throw fatal errors!
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@javax-swing @NthPortal The answer is performance :)

* @param executor the execution context on which the `body` is evaluated in
* @return the `Future` holding the result of the computation
*/
final def defer[T](body: => Future[T])(implicit executor: ExecutionContext): Future[T] =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal The method defers execution to the given ExecutionContext. It is less about expensive, and more about controlling where things get executed. No matter the method, if there is blocking, then wrapping in blocking{} should be encouraged.

final def defer[T](body: => Future[T])(implicit executor: ExecutionContext): Future[T] =
unit.flatMap(_ => body)

/** Simple version of `Future.traverse`. Asynchronously and non-blockingly transforms, in essence, a `IterableOnce[Future[A]]`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed a way of saying "a subtype of ItrableOnce to the same subtype of IterableOnce". Suggest alternative?

override def apply(v1: Try[T]): Unit = {
val r = getAndSet(null)
if (r ne null)
r tryComplete v1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the AtomicReference is not new—see History for background.
tryComplete is used because it is cheaper.

throw new IllegalStateException("problem in scala.concurrent internal callback", t)
private[concurrent] final object InternalCallbackExecutor extends ExecutionContext with java.util.concurrent.Executor with BatchingExecutor {
override protected final def unbatchedExecute(r: Runnable): Unit = r.run()
override final def reportFailure(t: Throwable): Unit = ExecutionContext.defaultReporter(t)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation for not throwing is that it could lead to broken code (callbacks not being executed). And I didn't want to throw a Fatal.

*
* @return This promise
*/
@deprecated("Since this method is semantically equivalent to `completeWith`, use that instead.", "2.13")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

* @return This promise
*/
@deprecated("Since this method is semantically equivalent to `completeWith`, use that instead.", "2.13")
final def tryCompleteWith(other: Future[T]): this.type = completeWith(other)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal Great question! My rationale was that completeWith is by necessity not always immediate, hence completeWith becomes an attempt by its very nature. I'm open to being convinced otherwise! :-)

final override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables)
final override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit)
final override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables)
final override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal In case it is moved out, I want(ed) to make sure that it stays final.

Copy link
Contributor

@NthPortal NthPortal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still working through the new Promise implementation, and may have more comments later. The main issue I see is that the old implementation was at least somewhat well documented (admittedly I am not very familiar with it), but this one is not. Consequently, it is much more work to trace the code and understand what it does/how it does it.

*/
// Left non-final to enable addition of extra fields by Java/Scala converters
// in scala-java8-compat.
class DefaultPromise[T] extends AtomicReference[AnyRef](Nil) with Promise[T] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old implementation has a large comment explaining its structure and how it worked, and the new one does not have such a comment. I think having the implementation explained well is very important, both for maintainability and to help others understand what is going on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal I completely agree. I'll make a draft documentation to help maintainers.

private[this] final var _arg: AnyRef,
private[this] final val _xform: Byte
) extends DefaultPromise[T]() with Callbacks[F] with Runnable with OnCompleteRunnable {
final def this(xform: Int, f: _ => _, ec: ExecutionContext) = this(f.asInstanceOf[Any => Any], ec.prepare(): AnyRef, xform.asInstanceOf[Byte])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have a comment explaining why using an untyped function is required, (as well as what a Transformation is in general). I don't like giving up entirely on type safety without good reason - and I'm not doubting that there is a good reason - but it would be helpful to know what that is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal Will add documentation. I had several drafts which preserved as much type information as possible, but it quickly became either too expensive (memory or indirection) or too unwieldly (parametrization). I can try with a type member approach to see if that works out better.

In general Transformation is a structure which encodes its own dispatch table to control its monomorphicity (inlineability, cheap type tests etc)

private[this] final var _arg: AnyRef,
private[this] final val _xform: Byte
) extends DefaultPromise[T]() with Callbacks[F] with Runnable with OnCompleteRunnable {
final def this(xform: Int, f: _ => _, ec: ExecutionContext) = this(f.asInstanceOf[Any => Any], ec.prepare(): AnyRef, xform.asInstanceOf[Byte])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Int.asInstanceOf[Byte] actually faster than Int.toByte? I would have guessed that they both compile to an i2b.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal You're right, but I didn't want to rely on compiler implementation. I can change it, it's easy to start to obsess when performance optimizing.

final val Xform_recoverWith = 8
final val Xform_filter = 9
final val Xform_collect = 10
//final val Xform_failed = 11
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume having these as 0.toByte etc. doesn't compile them to constants as efficiently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal Yeah, it doesn't compile them to constants. :(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that ascribing the type to the declaration would still produce a literal ie:

final val Xform_noop: Byte = 0

Would be a constant byte literal. I could be wrong about that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It becomes a term of type Byte, defined with the literal "0". The inliner only inlines constant vals of literal types, which Byte isn't.

The fix for this was discussed within SIP-23 (aka 42.type) but it was dropped in the final SIP: https://docs.scala-lang.org/sips/42.type.html#byte-and-short-literals.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bummer. Thanks for the helpful info.

case linked: DefaultPromise[_] => compressedRoot(linked)
case _ => this
}
override final def filter(@deprecatedName('pred) p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the @deprecatedName?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suspicion is that you forked before those were removed in #6319

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal Great catch, I thought I had found them all. I'll fix.

private[this] final def resolve[T](value: Try[T]): Try[T] =
if (requireNonNull(value).isInstanceOf[Success[T]]) value
else {
val t = value.asInstanceOf[Failure[T]].exception
Copy link
Contributor

@NthPortal NthPortal May 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we should keep requireNonNull just in case, but it seems to me that since null.isInstanceOf[_] is always false, and the other path immediately invokes .exception on value, a null completion will not get through, and requireNonNull is not paramount

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal Exactly—we could rely on .exception to throw the NPE but it would seem like a bug (and would be brittle). Also requireNonNull can possibly be optimized by theJVM.

override final def failed: Future[Throwable] = {
val state = get()
if (!state.isInstanceOf[Success[T]]) super.failed
//dispatchOrAddCallbacks(state, new Transformation[T, Throwable](Xform_failed, Future.failedFun[T], InternalCallbackExecutor)) // Short-circuit if we get a Failure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the status of this commented-out line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal Nice catch—it's a leftover from me experimenting what operations made sense to overload.


l.result
}
} else throw new IllegalArgumentException("Cannot wait for Undefined duration of time")

@throws(classOf[TimeoutException])
@throws(classOf[InterruptedException])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it was my impression that @throws[TimeoutException] was preferred now, but I could be mistaken

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal I can definitely switch to that!

final def ready(atMost: Duration)(implicit permit: CanAwait): this.type = {
val v = tryAwait0(atMost)
if (v ne null) this
else throw new TimeoutException("Future timed out after [" + atMost + "]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be reasonable to move the TimeoutException into tryAwait0?

val executor = _arg.asInstanceOf[ExecutionContext]
try {
_arg = resolved
executor.execute(this) // Safe publication of _arg = v (and _fun)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was resolved previously called v? Regardless, the comments need updating, as there is no v here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal Nice catch!

*/
val unit: Future[Unit] = successful(())
final val unit: Future[Unit] = fromTry(successOfUnit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious to me what the motivation for this change is. I don't object to it particularly, but it looks perhaps slightly less readable, and it's only executed once regardless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're reusing the same allocation to represent the successOfUnit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, I missed the other places it was used, but I see now

else {
val t = Thread.currentThread
if (t.isInstanceOf[BlockContext]) t.asInstanceOf[BlockContext]
else DefaultBlockContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little depressing that a match can't be used because this is faster

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal Yeah. For simple things like these, match is ~16 ops and if+typecheck+cast is ~10 ops.

Shouldn't have to be that way IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viktorklang - Did you actually microbenchmark it? I generally don't find a substantial difference once the JIT compiler gets through with it. It's just a store/load instruction pair. Shouldn't be hard to optimize away.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ichoran Yeah, the difference was noticeable on my machine. However, of course type of machine, type of JVM etc will make it differ. I'm erring on the side of it being easy to make it fast.

@dwijnand
Copy link
Member

(You're conflicting with your own #6529 and #6620, JFYI)

@viktorklang
Copy link
Contributor Author

@dwijnand Rebasing :)

case null => ()
case some => some tryComplete v1
final def firstCompletedOf[T](futures: IterableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] =
if (futures.isEmpty) Future.firstCompletedIsEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think having a stack trace might be helpful for figuring out where you made a bad call to firstCompletedOf. I'd love to get others' thoughts on whether they think it's an important cost.

I also notice that its behaviour in the case of an empty IterableOnce is not specified; the old one effectively returned Future.never, and this one returns a failed Future (which I think is better). Do you think it's a good idea to have the behaviour well-specified (rather than just "whatever the implementation does")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal I'm definitely OK with throwing a new Exception here as it shouldn't be a hot-path operation. Future.never is also an option… :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna ask on Gitter/Discourse to see what people think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sébastien thinks we should preserve 2.12 behaviour, which is (effectively) Future.never.

I'm actually thinking now that if it does throw, it ought to throw in the calling thread (perhaps using Predef.require?), rather than inside the returned Future, as it is difficult to distinguish between that failure and one from a Future passed in which completed with a failure.

Copy link
Member

@sjrd sjrd May 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Often, when deciding what is the correct behavior for an empty series, there exists a "best answer": use the neutral element of the operation. For example, the n-ary sum ∑ of an empty series is 0, but the n-ary product ∏ of an empty series is 1, which is also "why" a^0 = 1. A neutral element is such that, if you already have a series of n elements, and you add that x to the series, the result of the n-ary operation is unchanged.

For firstCompletedOf, what is an element x that, no matter what series xs you have, firstCompletedOf(xs) is always firstCompletedOf(xs :+ x)? Well ... it's Future.never.

By that logic, the correct answer is actually Future.never. QED.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to Future.never

@@ -338,7 +346,7 @@ private[concurrent] final object Promise {
val executor = _arg.asInstanceOf[ExecutionContext]
try {
_arg = resolved
executor.execute(this) // Safe publication of _arg = v (and _fun)
executor.execute(this) // Safe publication of _arg = resolved (and _fun)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's another v 6 lines up, sorry!

@viktorklang
Copy link
Contributor Author

@dwijnand @NthPortal Rebased

@viktorklang
Copy link
Contributor Author

@NthPortal I added some more documentation to the implementation.

override def run(): Unit = {
require(_tasksLocal.get eq null)
def this(r: Runnable) = {
this(4)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coul you share some context on why the batch size is 4? Maybe it should be configurable?

@viktorklang
Copy link
Contributor Author

assuming that goes well, should we merge for M5 (which we want to build this week) and hope that shipping it will give us a good chance of catching regressions before 2.13.0-RC1/2.13.0?

@SethTisue Sounds good. I have some concerns regarding the callback-concatenation but that should be rather easy to address (performance will regress though)—but if it passes the community build then that should be a good indicator IMO.

(t: Any) => throw new NoSuchElementException("Future.collect partial function is not defined at: " + t) with NoStackTrace

private[concurrent] final val filterFailure =
Failure[Nothing](new NoSuchElementException("Future.filter predicate is not satisfied") with NoStackTrace)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NthPortal Given our conversation about addSuppressed, how should we deal with this? By not reusing cached exceptions? I fear ending up with exceptions with tons of addSuppressed building up over time. Could also leak info between parts of the program.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think I would agree that we shouldn't cache the exception. It still should be fairly lightweight to construct though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only other option I can think of is a custom exception (that doesn't extend NoSuchElementException) which disables suppression and stack traces

private[concurrent] final def failedFun[T]: Try[T] => Try[Throwable] = _failedFun.asInstanceOf[Try[T] => Try[Throwable]]

private[concurrent] final val recoverWithFailedMarker: Future[Nothing] =
scala.concurrent.Future.failed(new Throwable with NoStackTrace)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the Throwable constructor which disables suppressed exceptions and writeable stacktraces.

@SethTisue
Copy link
Member

SethTisue commented Aug 9, 2018

@SethTisue SethTisue added the prio:blocker release blocker (used only by core team, only near release time) label Aug 9, 2018
@SethTisue SethTisue changed the title This update / revision of the Future and Promise implementation: Updated & revised Future and Promise implementation Aug 10, 2018
@SethTisue SethTisue added the release-notes worth highlighting in next release notes label Aug 10, 2018
@SethTisue
Copy link
Member

not sure what happened there, maybe a casualty of our disk space woes the other day. let's try again https://scala-ci.typesafe.com/job/scala-2.13.x-integrate-community-build/1332/

* @param executor the execution context on which the `body` is evaluated in
* @return the `Future` holding the result of the computation
*/
final def defer[T](body: => Future[T])(implicit executor: ExecutionContext): Future[T] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, will it be renamed?

@SethTisue
Copy link
Member

@viktorklang looks like this is ready to merge, can you squash it so we have only green commits?

@nafg
Copy link
Contributor

nafg commented Aug 12, 2018

@SethTisue shouldn't defer be renamed?

@viktorklang
Copy link
Contributor Author

@SethTisue I didn't see the community build pass yet. (https://scala-ci.typesafe.com/job/scala-2.13.x-integrate-community-build/1321/)

We have an ongoing conversation w.r.t. Future.defer—would you rather merge now and possibly it in time for RC1? Or should I try to rename + squash sometime tomorrow?

@nafg
Copy link
Contributor

nafg commented Aug 12, 2018

Do you mean because it will take time to come to a conclusion about what to name it?

@SethTisue
Copy link
Member

SethTisue commented Aug 12, 2018

I didn't see the community build pass yet

ah right. well here's a run that completed: https://scala-ci.typesafe.com/job/scala-2.13.x-integrate-community-build/1336/consoleFull . the unexpected successes are probably because your work here is based off older 2.13.x commits. I should probably do one more run after you squash & rebase (or at least rebasing, that's the key thing). the unexpected failures don't look related to this PR to me

We have an ongoing conversation w.r.t. Future.defer—would you rather merge now and possibly it in time for RC1? Or should I try to rename + squash sometime tomorrow?

yes, please come to some sort of decision within the next few days tomorrow so we can merge this for M5. (we probably won't build M5 until later in the week, but we should allow time to get one more round of community build results once this is rebased.)

if you end up deciding on a different name later, we can rename one method for RC1, it's no big deal, very few people will be using the new method yet.

@viktorklang
Copy link
Contributor Author

@SethTisue I just pushed a rebased & squashed commit. Will address the naming shortly (rename to delegate)

@SethTisue
Copy link
Member

 starting stressTestNumberofCallbacks
+java.lang.StackOverflowError
+	at scala.concurrent.impl.Promise$Transformation.handleFailure(Promise.scala:374)
+	at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:367)
+	at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:349)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:334)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:330)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:335)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:330)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:335)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:330)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:335)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:330)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:335)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:330)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:335)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:330)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:335)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:330)
+	at scala.concurrent.impl.Promise$ManyCallbacks.submitWithValue(Promise.scala:335)

@viktorklang
Copy link
Contributor Author

@SethTisue I might have to change that test—new impl will use more stack for certain usage patterns. Question is whether that will become a problem or not. Alternative uses a lot more CPU/Mem.

Introduces,
 Future.delegate[A](=> Future[A])(implicit ec: ExecutionContext): Future[A]
  - this allows for construction of Futures which is deferred
    onto the supplied ExecutionContext
  - Equivalent to: Future.unit.flatMap(_ => expr)

Makes Futures correctly handle RejectedExecutionExceptions,
from ExecutionContext.execute.

Makes Futures correctly handle Thread.interrupt()'s.

Needs more external verification on how much it improves performance
for most, if not all, combinators.

Introduces a different implementation of Linking, with dedicated link
nodes—which are GC:ed upon completion.

Introduces a different implementation of the transformations, instead of
relying primarily on transformWith or onComplete, the DefaultPromise
specializes practically all combinators. It also uses a manually encoded,
unrolled, dispatch instead of polymorphism, and uses an O(n) linking
concat (reverse-order) to avoid creating arbitrary-depth callback tries.

Duration.Undefined doesn't equal Duration.Undefined,
so switching to eq (internal change).
@viktorklang
Copy link
Contributor Author

@SethTisue I've pushed a rebased commit with a modified solution for concat:ing callbacks O(n) iso O(1) for promise linking, and a manually-unrolled dispatch instead of using polymorphism. It's unclear how much it will affect performance in the normal case, I guess we'll have to see what the community build and the M5 feedback says.

@viktorklang
Copy link
Contributor Author

@SethTisue How do I parse the output of that job? There's just so much errors in there who are probably irrelevant?

@SethTisue
Copy link
Member

down near the bottom we see:

unexpected FAILED: genjavadoc
unexpected FAILED: scala-js-stubs
unexpected FAILED: scalariform

the genjavadoc failure is also happening in other 2.13.x runs; I'll take care of it

the other failures appear to have been caused by some transient trouble we were having with our Artifactory on scala-ci, believed to be resolved now. so here's another run: https://scala-ci.typesafe.com/job/scala-2.13.x-integrate-community-build/1356/

@SethTisue
Copy link
Member

new run looks good! no unexpected failures. merging

@SethTisue SethTisue merged commit a62cb07 into scala:2.13.x Aug 18, 2018
@da-liii
Copy link
Contributor

da-liii commented Sep 1, 2018

@SethTisue @viktorklang Will this PR be backported to 2.12.x

I reported scala/bug#11126 . And just find that this PR fixes it.

And I hope some Scala experts help us to reiview my PR for Apache Spark: apache/spark#22304

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
library:concurrent Changes to the concurrency support in stdlib prio:blocker release blocker (used only by core team, only near release time) release-notes worth highlighting in next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.