diff --git a/examples/go-by-example-channels/non-blocking-channel-operations.rb b/examples/go-by-example-channels/non-blocking-channel-operations.rb index 572d4b1f7..71eeaef65 100755 --- a/examples/go-by-example-channels/non-blocking-channel-operations.rb +++ b/examples/go-by-example-channels/non-blocking-channel-operations.rb @@ -15,9 +15,9 @@ s.default { print "no message received\n" } end -msg = 'hi' +message = 'hi' Channel.select do |s| - s.put(messages, msg) { |m| print "sent message #{m}\n" } + s.put(messages, message) { |msg| print "sent message #{msg}\n" } s.default { print "no message sent\n" } end diff --git a/lib/concurrent/channel/tick.rb b/lib/concurrent/channel/tick.rb index 557ba8caf..f4830482b 100644 --- a/lib/concurrent/channel/tick.rb +++ b/lib/concurrent/channel/tick.rb @@ -1,3 +1,4 @@ +require 'concurrent/synchronization' require 'concurrent/utility/monotonic_time' module Concurrent @@ -13,8 +14,9 @@ class Channel # @see Concurrent.monotonic_time # @see Concurrent::Channel.ticker # @see Concurrent::Channel.timer - class Tick + class Tick < Synchronization::Object include Comparable + safe_initialization! STRING_FORMAT = '%F %T.%6N %z %Z'.freeze diff --git a/lib/concurrent/concern/dereferenceable.rb b/lib/concurrent/concern/dereferenceable.rb index 796225855..b58d2cef4 100644 --- a/lib/concurrent/concern/dereferenceable.rb +++ b/lib/concurrent/concern/dereferenceable.rb @@ -9,13 +9,17 @@ module Concern # # @!macro copy_options module Dereferenceable + # NOTE: This module is going away in 2.0. In the mean time we need it to + # play nicely with the synchronization layer. This means that the + # including class SHOULD be synchronized and it MUST implement a + # `#synchronize` method. Not doing so will lead to runtime errors. # Return the value this object represents after applying the options specified # by the `#set_deref_options` method. # # @return [Object] the current value of the object def value - mutex.synchronize { apply_deref_options(@value) } + synchronize { apply_deref_options(@value) } end alias_method :deref, :value @@ -25,43 +29,24 @@ def value # # @param [Object] value the new value def value=(value) - mutex.synchronize{ @value = value } - end - - # A mutex lock used for synchronizing thread-safe operations. Methods defined - # by `Dereferenceable` are synchronized using the `Mutex` returned from this - # method. Operations performed by the including class that operate on the - # `@value` instance variable should be locked with this `Mutex`. - # - # @return [Mutex] the synchronization object - def mutex - @mutex - end - - # Initializes the internal `Mutex`. - # - # @note This method *must* be called from within the constructor of the including class. - # - # @see #mutex - def init_mutex(mutex = Mutex.new) - @mutex = mutex + synchronize{ @value = value } end # @!macro [attach] dereferenceable_set_deref_options # Set the options which define the operations #value performs before # returning data to the caller (dereferencing). - # + # # @note Most classes that include this module will call `#set_deref_options` # from within the constructor, thus allowing these options to be set at # object creation. - # + # # @param [Hash] opts the options defining dereference behavior. # @option opts [String] :dup_on_deref (false) call `#dup` before returning the data # @option opts [String] :freeze_on_deref (false) call `#freeze` before returning the data # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing # the internal value and returning the value returned from the proc def set_deref_options(opts = {}) - mutex.synchronize{ ns_set_deref_options(opts) } + synchronize{ ns_set_deref_options(opts) } end # @!macro dereferenceable_set_deref_options diff --git a/lib/concurrent/concern/obligation.rb b/lib/concurrent/concern/obligation.rb index 9f1d6c849..17c035dbc 100644 --- a/lib/concurrent/concern/obligation.rb +++ b/lib/concurrent/concern/obligation.rb @@ -9,6 +9,10 @@ module Concern module Obligation include Concern::Dereferenceable + # NOTE: The Dereferenceable module is going away in 2.0. In the mean time + # we need it to place nicely with the synchronization layer. This means + # that the including class SHOULD be synchronized and it MUST implement a + # `#synchronize` method. Not doing so will lead to runtime errors. # Has the obligation been fulfilled? # @@ -104,7 +108,7 @@ def value!(timeout = nil) # # @return [Symbol] the current state def state - mutex.synchronize { @state } + synchronize { @state } end # If an exception was raised during processing this will return the @@ -113,7 +117,7 @@ def state # # @return [Exception] the exception raised during processing or `nil` def reason - mutex.synchronize { @reason } + synchronize { @reason } end # @example allows Obligation to be risen @@ -132,8 +136,7 @@ def get_arguments_from(opts = {}) end # @!visibility private - def init_obligation(*args) - init_mutex(*args) + def init_obligation @event = Event.new end @@ -155,7 +158,7 @@ def set_state(success, value, reason) # @!visibility private def state=(value) - mutex.synchronize { ns_set_state(value) } + synchronize { ns_set_state(value) } end # Atomic compare and set operation @@ -163,12 +166,12 @@ def state=(value) # # @param [Symbol] next_state # @param [Symbol] expected_current - # + # # @return [Boolean] true is state is changed, false otherwise # # @!visibility private def compare_and_set_state(next_state, *expected_current) - mutex.synchronize do + synchronize do if expected_current.include? @state @state = next_state true @@ -184,7 +187,7 @@ def compare_and_set_state(next_state, *expected_current) # # @!visibility private def if_state(*expected_states) - mutex.synchronize do + synchronize do raise ArgumentError.new('no block given') unless block_given? if expected_states.include? @state diff --git a/lib/concurrent/delay.rb b/lib/concurrent/delay.rb index 492d2ed75..d1fc22662 100644 --- a/lib/concurrent/delay.rb +++ b/lib/concurrent/delay.rb @@ -155,7 +155,7 @@ def reconfigure(&block) protected def ns_initialize(opts, &block) - init_obligation(self) + init_obligation set_deref_options(opts) @executor = opts[:executor] diff --git a/lib/concurrent/edge/future.rb b/lib/concurrent/edge/future.rb index 9c4a345de..da97fce2c 100644 --- a/lib/concurrent/edge/future.rb +++ b/lib/concurrent/edge/future.rb @@ -131,7 +131,7 @@ def post_on(executor, *args, &job) # Represents an event which will happen in future (will be completed). It has to always happen. class Event < Synchronization::LockableObject safe_initialization! - private *attr_volatile_with_cas(:internal_state) + private(*attr_volatile_with_cas(:internal_state)) public :internal_state include Concern::Deprecation include Concern::Logging diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index 4ed6d7158..994a620b2 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -153,7 +153,7 @@ def try_set(value = NULL, &block) # @!visibility private def ns_initialize(value, opts) value = yield if block_given? - init_obligation(self) + init_obligation self.observers = Collection::CopyOnWriteObserverSet.new set_deref_options(opts) diff --git a/lib/concurrent/lazy_register.rb b/lib/concurrent/lazy_register.rb index a47b58cf5..d4d656a28 100644 --- a/lib/concurrent/lazy_register.rb +++ b/lib/concurrent/lazy_register.rb @@ -18,7 +18,7 @@ module Concurrent # @!macro edge_warning class LazyRegister < Synchronization::Object - private *attr_volatile_with_cas(:data) + private(*attr_volatile_with_cas(:data)) def initialize super diff --git a/lib/concurrent/maybe.rb b/lib/concurrent/maybe.rb index 6412fb93b..6014594c4 100644 --- a/lib/concurrent/maybe.rb +++ b/lib/concurrent/maybe.rb @@ -1,3 +1,5 @@ +require 'concurrent/synchronization' + module Concurrent # A `Maybe` encapsulates an optional value. A `Maybe` either contains a value @@ -99,8 +101,9 @@ module Concurrent # # @see https://hackage.haskell.org/package/base-4.2.0.1/docs/Data-Maybe.html Haskell Data.Maybe # @see https://github.com/purescript/purescript-maybe/blob/master/docs/Data.Maybe.md PureScript Data.Maybe - class Maybe + class Maybe < Synchronization::Object include Comparable + safe_initialization! # Indicates that the given attribute has not been set. # When `Just` the {#nothing} getter will return `NONE`. @@ -168,7 +171,7 @@ def self.nothing(error = '') end # Is this `Maybe` a `Just` (successfully fulfilled with a value)? - # + # # @return [Boolean] True if `Just` or false if `Nothing`. def just? ! nothing? @@ -176,7 +179,7 @@ def just? alias :fulfilled? :just? # Is this `Maybe` a `nothing` (rejected with an exception upon fulfillment)? - # + # # @return [Boolean] True if `Nothing` or false if `Just`. def nothing? @nothing != NONE diff --git a/lib/concurrent/mvar.rb b/lib/concurrent/mvar.rb index 4d51020c9..0d18dfac2 100644 --- a/lib/concurrent/mvar.rb +++ b/lib/concurrent/mvar.rb @@ -1,4 +1,5 @@ require 'concurrent/concern/dereferenceable' +require 'concurrent/synchronization' module Concurrent @@ -34,9 +35,9 @@ module Concurrent # 2. S. Peyton Jones, A. Gordon, and S. Finne. [Concurrent Haskell](http://dl.acm.org/citation.cfm?id=237794). # In Proceedings of the 23rd Symposium on Principles of Programming Languages # (PoPL), 1996. - class MVar - + class MVar < Synchronization::Object include Concern::Dereferenceable + safe_initialization! # Unique value that represents that an `MVar` was empty EMPTY = Object.new @@ -200,6 +201,12 @@ def full? !empty? end + protected + + def synchronize(&block) + @mutex.synchronize(&block) + end + private def unlocked_empty? diff --git a/lib/concurrent/timer_task.rb b/lib/concurrent/timer_task.rb index 18df85f5e..6aa303148 100644 --- a/lib/concurrent/timer_task.rb +++ b/lib/concurrent/timer_task.rb @@ -271,7 +271,6 @@ def timeout_interval=(value) private def ns_initialize(opts, &task) - init_mutex(self) set_deref_options(opts) self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL diff --git a/lib/concurrent/tvar.rb b/lib/concurrent/tvar.rb index 75386f497..b937ffcab 100644 --- a/lib/concurrent/tvar.rb +++ b/lib/concurrent/tvar.rb @@ -1,4 +1,5 @@ require 'set' +require 'concurrent/synchronization' module Concurrent @@ -8,7 +9,8 @@ module Concurrent # @!macro thread_safe_variable_comparison # # {include:file:doc/tvar.md} - class TVar + class TVar < Synchronization::Object + safe_initialization! # Create a new `TVar` with an initial value. def initialize(value) diff --git a/spec/concurrent/concern/obligation_spec.rb b/spec/concurrent/concern/obligation_spec.rb index a03c4c1fa..b78b08490 100644 --- a/spec/concurrent/concern/obligation_spec.rb +++ b/spec/concurrent/concern/obligation_spec.rb @@ -5,14 +5,9 @@ module Concern let (:obligation_class) do - Class.new do + Class.new(Synchronization::LockableObject) do include Obligation - - def initialize - init_mutex - end - - public :state=, :compare_and_set_state, :if_state, :mutex + public :state=, :compare_and_set_state, :if_state attr_writer :value, :reason end end @@ -278,7 +273,8 @@ def initialize end it 'should execute the block within the mutex' do - obligation.if_state(:unscheduled) { expect(obligation.mutex).to be_locked } + expect(obligation).to receive(:synchronize) + obligation.if_state(:unscheduled) { nil } end end