Skip to content

Commit

Permalink
Allows to build promise trees with lazy evaluated branches
Browse files Browse the repository at this point in the history
- Adds #chain_delay, #then_delay, #rescue_delay which are same as #chain,
  #then, #rescue but are not evaluated automatically but only when requested
  by #value.
- Restructure class hierarchy. Only one Future with Multiple Promise
  implementations which are hidden to the user. Provides better encapsulation.
- Delay is now implemented as a Promise descendant.
  • Loading branch information
pitr-ch committed Nov 5, 2014
1 parent 16783f3 commit 3b877db
Showing 1 changed file with 166 additions and 72 deletions.
238 changes: 166 additions & 72 deletions lib/concurrent/next.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def future(executor = :fast, &block)

# @return [Delay]
def delay(executor = :fast, &block)
Delay.new(executor, &block)
Delay.new(nil, executor, &block).future
end

alias_method :async, :future
Expand Down Expand Up @@ -144,21 +144,22 @@ module FutureHelpers
# @return [Future]
def join(*futures)
countdown = Concurrent::AtomicFixnum.new futures.size
promise = Promise.new.add_blocked_by(*futures) # TODO add injectable executor
promise = ExternalPromise.new(futures)
futures.each { |future| future.add_callback :join, countdown, promise, *futures }
promise.future
end

# @return [Future]
def execute(executor = :fast, &block)
promise = Promise.new(executor)
promise = ExternalPromise.new([], executor)
Next.executor(executor).post { promise.evaluate_to &block }
promise.future
end
end

class Future < SynchronizedObject
extend FutureHelpers
extend Shortcuts

singleton_class.send :alias_method, :dataflow, :join

Expand Down Expand Up @@ -264,26 +265,47 @@ def exception(*args)
reason.exception(*args)
end

# TODO add #then_delay { ... } and such to be able to chain delayed evaluations
# TODO needs better name
def connect(executor = default_executor)
ConnectedPromise.new(self, executor).future
end

# @yield [success, value, reason] of the parent
def chain(executor = default_executor, &callback)
add_callback :chain_callback, executor, promise = Promise.new(default_executor).add_blocked_by(self), callback
add_callback :chain_callback, executor, promise = ExternalPromise.new([self], default_executor), callback
promise.future
end

# @yield [value] executed only on parent success
def then(executor = default_executor, &callback)
add_callback :then_callback, executor, promise = Promise.new(default_executor).add_blocked_by(self), callback
add_callback :then_callback, executor, promise = ExternalPromise.new([self], default_executor), callback
promise.future
end

# @yield [reason] executed only on parent failure
def rescue(executor = default_executor, &callback)
add_callback :rescue_callback, executor, promise = Promise.new(default_executor).add_blocked_by(self), callback
add_callback :rescue_callback, executor, promise = ExternalPromise.new([self], default_executor), callback
promise.future
end

# lazy version of #chain
def chain_delay(executor = default_executor, &callback)
delay = Delay.new(self, executor) { callback_on_completion callback }
delay.future
end

# lazy version of #then
def then_delay(executor = default_executor, &callback)
delay = Delay.new(self, executor) { conditioned_callback callback }
delay.future
end

# lazy version of #rescue
def rescue_delay(executor = default_executor, &callback)
delay = Delay.new(self, executor) { callback_on_failure callback }
delay.future
end

# @yield [success, value, reason] executed async on `executor` when completed
# @return self
def on_completion(executor = default_executor, &callback)
Expand Down Expand Up @@ -399,27 +421,15 @@ def with_promise(promise, &block)
end

def chain_callback(executor, promise, callback)
with_async(executor) do
with_promise(promise) do
callback_on_completion callback
end
end
with_async(executor) { with_promise(promise) { callback_on_completion callback } }
end

def then_callback(executor, promise, callback)
with_async(executor) do
with_promise(promise) do
success? ? callback.call(value) : raise(reason)
end
end
with_async(executor) { with_promise(promise) { conditioned_callback callback } }
end

def rescue_callback(executor, promise, callback)
with_async(executor) do
with_promise(promise) do
callback_on_failure callback
end
end
with_async(executor) { with_promise(promise) { callback_on_failure callback } }
end

def with_async(executor)
Expand Down Expand Up @@ -450,20 +460,20 @@ def callback_on_failure(callback)
callback.call reason if failed?
end

def conditioned_callback(callback)
self.success? ? callback.call(value) : raise(reason)
end

def call_callback(method, *args)
self.send method, *args
end
end

class Promise < SynchronizedObject
# @api private
def initialize(executor_or_future = :fast)
def initialize(executor = :fast)
super()
future = if Future === executor_or_future
executor_or_future
else
Future.new(self, executor_or_future)
end
future = Future.new(self, executor)

synchronize do
@future = future
Expand All @@ -480,6 +490,42 @@ def blocked_by
synchronize { @blocked_by }
end

def state
future.state
end

def touch
blocked_by.each(&:touch) if synchronize { @touched ? false : (@touched = true) }
end

def to_s
"<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>"
end

def inspect
"#{to_s[0..-2]} blocked_by:[#{synchronize { @blocked_by }.map(&:to_s).join(', ')}]>"
end

# @api private
def complete(success, value, reason, raise = true)
future.complete(success, value, reason, raise)
synchronize { @blocked_by.clear }
end

private

def add_blocked_by(*futures)
synchronize { @blocked_by += futures }
self
end
end

class ExternalPromise < Promise
def initialize(blocked_by_futures, executor_or_future = :fast)
super executor_or_future
add_blocked_by *blocked_by_futures
end

# Set the `IVar` to a value and wake or notify all threads waiting on it.
#
# @param [Object] value the value to store in the `IVar`
Expand All @@ -506,15 +552,6 @@ def try_fail(reason = StandardError.new)
!!complete(false, nil, reason, false)
end

def complete(success, value, reason, raise = true)
future.complete(success, value, reason, raise)
synchronize { @blocked_by.clear }
end

def state
future.state
end

# @return [Future]
def evaluate_to(&block)
success block.call
Expand All @@ -526,56 +563,59 @@ def evaluate_to(&block)
def evaluate_to!(&block)
evaluate_to(&block).no_error!
end
end

class ConnectedPromise < Promise
def initialize(future, executor_or_future = :fast)
super(executor_or_future)
connect_to future
end

private

# @return [Future]
def connect_to(future)
add_blocked_by future
future.add_callback :set_promise_on_completion, self
self.future
end

def touch
blocked_by.each(&:touch) if synchronize { @touched ? false : (@touched = true) }
end

def to_s
"<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>"
end

def inspect
"#{to_s[0..-2]} blocked_by:[#{synchronize { @blocked_by }.map(&:to_s).join(', ')}]>"
end

# @api private
def add_blocked_by(*futures)
synchronize { @blocked_by += futures }
self
end
end

class Delay < Future

def initialize(default_executor = :fast, &block)
super(Promise.new(self), default_executor)
raise ArgumentError.new('no block given') unless block_given?
class Delay < Promise
def initialize(blocked_by_future, executor_or_future = :fast, &task)
super(executor_or_future)
synchronize do
@task = task
@computing = false
@task = block
end
add_blocked_by blocked_by_future if blocked_by_future
end

def wait(timeout = nil)
touch
super timeout
def touch
if blocked_by.all?(&:completed?)
execute_once
else
blocked_by.each { |f| f.on_success! { self.touch } unless synchronize { @touched } }
super
end
end

# starts executing the value without blocking
def touch
private

def execute_once
execute, task = synchronize do
[(@computing = true unless @computing), @task]
end

Next.executor(default_executor).post { promise.evaluate_to &task } if execute
if execute
Next.executor(future.default_executor).post do
begin
complete true, task.call, nil
rescue => error
complete false, nil, error
end
end
end
self
end
end
Expand Down Expand Up @@ -610,7 +650,7 @@ def touch
future2 = future1.then { |v| v + 1 } # will fail with 'boo' error, executed on default FAST_EXECUTOR
future3 = future1.rescue { |err| err.message } # executed on default FAST_EXECUTOR
future4 = future0.chain { |success, value, reason| success } # executed on default FAST_EXECUTOR
future5 = Promise.new(:io).connect_to(future3)
future5 = future3.connect(:io) # connects new future with different executor, the new future is completed when future3 is
future6 = future5.then(&:capitalize) # executes on IO_EXECUTOR because default was set to :io on future5
future7 = Future.join(future0, future3)

Expand Down Expand Up @@ -642,8 +682,9 @@ def touch
puts '-- promise like tree'

# if head of the tree is not constructed with #future but with #delay it does not start execute,
# it's triggered later by calling wait or value on any of the depedent futures or the delay itself
tree = (head = delay { 1 }).then { |v| v.succ }.then(&:succ).then(&:succ)
# it's triggered later by calling wait or value on any of the dependent futures or the delay itself
three = (head = delay { 1 }).then { |v| v.succ }.then(&:succ)
four = three.then_delay(&:succ)

# meaningful to_s and inspect defined for Future and Promise
puts head
Expand All @@ -652,12 +693,65 @@ def touch
# <#Concurrent::Next::Delay:7f89b4bccc68 pending [<#Concurrent::Next::Promise:7f89b4bccb00 pending>]]>
p head.callbacks
# [[:then_callback, :fast, <#Concurrent::Next::Promise:0x7fa54b31d218 pending [<#Concurrent::Next::Delay:0x7fa54b31d380 pending>]>, #<Proc:0x007fa54b31d290>]]
p tree.value

# evaluates only up to three, four is left unevaluated
p three.value # 3
p four, four.promise
# until value is called on four
p four.value # 4

# futures hidden behind two delays trigger evaluation of both
double_delay = delay { 1 }.then_delay(&:succ)
p double_delay.value # 2

puts '-- graph'

head = future { 1 }
branch1 = head.then(&:succ).then(&:succ)
branch2 = head.then(&:succ).then_delay(&:succ)
result = Future.join(branch1, branch2).then { |b1, b2| b1 + b2 }

sleep 0.1
p branch1.completed?, branch2.completed? # true, false
# force evaluation of whole graph
p result.value # 6

puts '-- bench'
require 'benchmark'

Benchmark.bmbm(20) do |b|
module Benchmark
def self.bmbmbm(rehearsals, width)
job = Job.new(width)
yield(job)
width = job.width + 1
sync = STDOUT.sync
STDOUT.sync = true

# rehearsal
rehearsals.times do
puts 'Rehearsal '.ljust(width+CAPTION.length, '-')
ets = job.list.inject(Tms.new) { |sum, (label, item)|
print label.ljust(width)
res = Benchmark.measure(&item)
print res.format
sum + res
}.format("total: %tsec")
print " #{ets}\n\n".rjust(width+CAPTION.length+2, '-')
end

# take
print ' '*width + CAPTION
job.list.map { |label, item|
GC.start
print label.ljust(width)
Benchmark.measure(label, &item).tap { |res| print res }
}
ensure
STDOUT.sync = sync unless sync.nil?
end
end

Benchmark.bmbmbm(20, 20) do |b|

parents = [RubySynchronizedObject, (JavaSynchronizedObject if defined? JavaSynchronizedObject)].compact
classes = parents.map do |parent|
Expand Down

0 comments on commit 3b877db

Please sign in to comment.