Skip to content

Commit

Permalink
Merge pull request #462 from ruby-concurrency/sync-updates
Browse files Browse the repository at this point in the history
Sync updates
  • Loading branch information
jdantonio committed Nov 4, 2015
2 parents 9f58449 + f465e95 commit c690eb7
Show file tree
Hide file tree
Showing 13 changed files with 51 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
s.default { print "no message received\n" }
end

msg = 'hi'
message = 'hi'
Channel.select do |s|
s.put(messages, msg) { |m| print "sent message #{m}\n" }
s.put(messages, message) { |msg| print "sent message #{msg}\n" }
s.default { print "no message sent\n" }
end

Expand Down
4 changes: 3 additions & 1 deletion lib/concurrent/channel/tick.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require 'concurrent/synchronization'
require 'concurrent/utility/monotonic_time'

module Concurrent
Expand All @@ -13,8 +14,9 @@ class Channel
# @see Concurrent.monotonic_time
# @see Concurrent::Channel.ticker
# @see Concurrent::Channel.timer
class Tick
class Tick < Synchronization::Object
include Comparable
safe_initialization!

STRING_FORMAT = '%F %T.%6N %z %Z'.freeze

Expand Down
33 changes: 9 additions & 24 deletions lib/concurrent/concern/dereferenceable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ module Concern
#
# @!macro copy_options
module Dereferenceable
# NOTE: This module is going away in 2.0. In the mean time we need it to
# play nicely with the synchronization layer. This means that the
# including class SHOULD be synchronized and it MUST implement a
# `#synchronize` method. Not doing so will lead to runtime errors.

# Return the value this object represents after applying the options specified
# by the `#set_deref_options` method.
#
# @return [Object] the current value of the object
def value
mutex.synchronize { apply_deref_options(@value) }
synchronize { apply_deref_options(@value) }
end
alias_method :deref, :value

Expand All @@ -25,43 +29,24 @@ def value
#
# @param [Object] value the new value
def value=(value)
mutex.synchronize{ @value = value }
end

# A mutex lock used for synchronizing thread-safe operations. Methods defined
# by `Dereferenceable` are synchronized using the `Mutex` returned from this
# method. Operations performed by the including class that operate on the
# `@value` instance variable should be locked with this `Mutex`.
#
# @return [Mutex] the synchronization object
def mutex
@mutex
end

# Initializes the internal `Mutex`.
#
# @note This method *must* be called from within the constructor of the including class.
#
# @see #mutex
def init_mutex(mutex = Mutex.new)
@mutex = mutex
synchronize{ @value = value }
end

# @!macro [attach] dereferenceable_set_deref_options
# Set the options which define the operations #value performs before
# returning data to the caller (dereferencing).
#
#
# @note Most classes that include this module will call `#set_deref_options`
# from within the constructor, thus allowing these options to be set at
# object creation.
#
#
# @param [Hash] opts the options defining dereference behavior.
# @option opts [String] :dup_on_deref (false) call `#dup` before returning the data
# @option opts [String] :freeze_on_deref (false) call `#freeze` before returning the data
# @option opts [String] :copy_on_deref (nil) call the given `Proc` passing
# the internal value and returning the value returned from the proc
def set_deref_options(opts = {})
mutex.synchronize{ ns_set_deref_options(opts) }
synchronize{ ns_set_deref_options(opts) }
end

# @!macro dereferenceable_set_deref_options
Expand Down
19 changes: 11 additions & 8 deletions lib/concurrent/concern/obligation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ module Concern

module Obligation
include Concern::Dereferenceable
# NOTE: The Dereferenceable module is going away in 2.0. In the mean time
# we need it to place nicely with the synchronization layer. This means
# that the including class SHOULD be synchronized and it MUST implement a
# `#synchronize` method. Not doing so will lead to runtime errors.

# Has the obligation been fulfilled?
#
Expand Down Expand Up @@ -104,7 +108,7 @@ def value!(timeout = nil)
#
# @return [Symbol] the current state
def state
mutex.synchronize { @state }
synchronize { @state }
end

# If an exception was raised during processing this will return the
Expand All @@ -113,7 +117,7 @@ def state
#
# @return [Exception] the exception raised during processing or `nil`
def reason
mutex.synchronize { @reason }
synchronize { @reason }
end

# @example allows Obligation to be risen
Expand All @@ -132,8 +136,7 @@ def get_arguments_from(opts = {})
end

# @!visibility private
def init_obligation(*args)
init_mutex(*args)
def init_obligation
@event = Event.new
end

Expand All @@ -155,20 +158,20 @@ def set_state(success, value, reason)

# @!visibility private
def state=(value)
mutex.synchronize { ns_set_state(value) }
synchronize { ns_set_state(value) }
end

# Atomic compare and set operation
# State is set to `next_state` only if `current state == expected_current`.
#
# @param [Symbol] next_state
# @param [Symbol] expected_current
#
#
# @return [Boolean] true is state is changed, false otherwise
#
# @!visibility private
def compare_and_set_state(next_state, *expected_current)
mutex.synchronize do
synchronize do
if expected_current.include? @state
@state = next_state
true
Expand All @@ -184,7 +187,7 @@ def compare_and_set_state(next_state, *expected_current)
#
# @!visibility private
def if_state(*expected_states)
mutex.synchronize do
synchronize do
raise ArgumentError.new('no block given') unless block_given?

if expected_states.include? @state
Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent/delay.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def reconfigure(&block)
protected

def ns_initialize(opts, &block)
init_obligation(self)
init_obligation
set_deref_options(opts)
@executor = opts[:executor]

Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent/edge/future.rb
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def post_on(executor, *args, &job)
# Represents an event which will happen in future (will be completed). It has to always happen.
class Event < Synchronization::LockableObject
safe_initialization!
private *attr_volatile_with_cas(:internal_state)
private(*attr_volatile_with_cas(:internal_state))
public :internal_state
include Concern::Deprecation
include Concern::Logging
Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent/ivar.rb
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def try_set(value = NULL, &block)
# @!visibility private
def ns_initialize(value, opts)
value = yield if block_given?
init_obligation(self)
init_obligation
self.observers = Collection::CopyOnWriteObserverSet.new
set_deref_options(opts)

Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent/lazy_register.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ module Concurrent
# @!macro edge_warning
class LazyRegister < Synchronization::Object

private *attr_volatile_with_cas(:data)
private(*attr_volatile_with_cas(:data))

def initialize
super
Expand Down
9 changes: 6 additions & 3 deletions lib/concurrent/maybe.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'concurrent/synchronization'

module Concurrent

# A `Maybe` encapsulates an optional value. A `Maybe` either contains a value
Expand Down Expand Up @@ -99,8 +101,9 @@ module Concurrent
#
# @see https://hackage.haskell.org/package/base-4.2.0.1/docs/Data-Maybe.html Haskell Data.Maybe
# @see https://github.com/purescript/purescript-maybe/blob/master/docs/Data.Maybe.md PureScript Data.Maybe
class Maybe
class Maybe < Synchronization::Object
include Comparable
safe_initialization!

# Indicates that the given attribute has not been set.
# When `Just` the {#nothing} getter will return `NONE`.
Expand Down Expand Up @@ -168,15 +171,15 @@ def self.nothing(error = '')
end

# Is this `Maybe` a `Just` (successfully fulfilled with a value)?
#
#
# @return [Boolean] True if `Just` or false if `Nothing`.
def just?
! nothing?
end
alias :fulfilled? :just?

# Is this `Maybe` a `nothing` (rejected with an exception upon fulfillment)?
#
#
# @return [Boolean] True if `Nothing` or false if `Just`.
def nothing?
@nothing != NONE
Expand Down
11 changes: 9 additions & 2 deletions lib/concurrent/mvar.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'concurrent/concern/dereferenceable'
require 'concurrent/synchronization'

module Concurrent

Expand Down Expand Up @@ -34,9 +35,9 @@ module Concurrent
# 2. S. Peyton Jones, A. Gordon, and S. Finne. [Concurrent Haskell](http://dl.acm.org/citation.cfm?id=237794).
# In Proceedings of the 23rd Symposium on Principles of Programming Languages
# (PoPL), 1996.
class MVar

class MVar < Synchronization::Object
include Concern::Dereferenceable
safe_initialization!

# Unique value that represents that an `MVar` was empty
EMPTY = Object.new
Expand Down Expand Up @@ -200,6 +201,12 @@ def full?
!empty?
end

protected

def synchronize(&block)
@mutex.synchronize(&block)
end

private

def unlocked_empty?
Expand Down
1 change: 0 additions & 1 deletion lib/concurrent/timer_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ def timeout_interval=(value)
private

def ns_initialize(opts, &task)
init_mutex(self)
set_deref_options(opts)

self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
Expand Down
4 changes: 3 additions & 1 deletion lib/concurrent/tvar.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'set'
require 'concurrent/synchronization'

module Concurrent

Expand All @@ -8,7 +9,8 @@ module Concurrent
# @!macro thread_safe_variable_comparison
#
# {include:file:doc/tvar.md}
class TVar
class TVar < Synchronization::Object
safe_initialization!

# Create a new `TVar` with an initial value.
def initialize(value)
Expand Down
12 changes: 4 additions & 8 deletions spec/concurrent/concern/obligation_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,9 @@ module Concern

let (:obligation_class) do

Class.new do
Class.new(Synchronization::LockableObject) do
include Obligation

def initialize
init_mutex
end

public :state=, :compare_and_set_state, :if_state, :mutex
public :state=, :compare_and_set_state, :if_state
attr_writer :value, :reason
end
end
Expand Down Expand Up @@ -278,7 +273,8 @@ def initialize
end

it 'should execute the block within the mutex' do
obligation.if_state(:unscheduled) { expect(obligation.mutex).to be_locked }
expect(obligation).to receive(:synchronize)
obligation.if_state(:unscheduled) { nil }
end
end

Expand Down

0 comments on commit c690eb7

Please sign in to comment.