Skip to content

Commit

Permalink
default channels are of max size
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jul 20, 2015
1 parent d529f86 commit 5caf298
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 74 deletions.
40 changes: 10 additions & 30 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@

abstract AbstractChannel{T}

@enum ChannelState C_OPEN C_CLOSED

type Channel{T} <: AbstractChannel{T}
cid::Int
cond_take::Condition # waiting for data to become available
cond_put::Condition # waiting for a writeable slot
state::ChannelState
state::Symbol

data::Array{T,1}
szp1::Int # current channel size plus one
sz_max::Int # maximum size of channel
take_pos::Int # read position
put_pos::Int # write position

Channel(elt, szp1, sz_max) = new(get_next_channel_id(), Condition(), Condition(), C_OPEN,
Channel(elt, szp1, sz_max) = new(get_next_channel_id(), Condition(), Condition(), :open,
Array(T, szp1), szp1, sz_max, 1, 1)
end

Expand All @@ -30,20 +28,21 @@ let next_channel_id=1
end

Channel() = Channel(Any)
Channel(T::Type) = Channel(T::Type, 1)
Channel(T::Type) = Channel(T::Type, typemax(Int))
Channel(sz::Int) = Channel(Any, sz)
function Channel(T::Type, sz::Int)
sz_max = sz == typemax(Int) ? typemax(Int) - 1 : sz
csz = sz > 32 ? 32 : sz
Channel{T}(T, csz+1, sz_max)
end

closed_exception() = InvalidStateException("Channel is closed.", :closed)
function close(c::Channel)
c.state = C_CLOSED
notify(c.cond_take, nothing, true, false)
c.state = :closed
notify_error(c::Channel, closed_exception())
c
end
isopen(c::Channel) = (c.state == C_OPEN)
isopen(c::Channel) = (c.state == :open)

type InvalidStateException <: Exception
msg::AbstractString
Expand All @@ -52,27 +51,8 @@ end
InvalidStateException() = InvalidStateException("")
InvalidStateException(msg) = InvalidStateException(msg, 0)

start(c::Channel) = nothing
function done(c::Channel, state)
while isopen(c)
try
# we are waiting either for more data or channel to be closed
wait(c)
isready(c) && return false
catch e
if isa(e, InvalidStateException) && e.state==C_CLOSED
return true
else
rethrow(e)
end
end
end
return true
end
next(c::Channel, state) = (take!(c), nothing)

function put!(c::Channel, v)
!isopen(c) && throw(InvalidStateException("Channel is closed.", C_CLOSED))
!isopen(c) && throw(closed_exception())
d = c.take_pos - c.put_pos
if (d == 1) || (d == -(c.szp1-1))
# grow the channel if possible
Expand All @@ -96,6 +76,7 @@ function put!(c::Channel, v)
c.data = newdata
else
wait(c.cond_put)
check_open(c)
end
end

Expand All @@ -111,7 +92,7 @@ function fetch(c::Channel)
end

function take!(c::Channel)
!isopen(c) && !isready(c) && throw(InvalidStateException("Channel is closed.", C_CLOSED))
!isopen(c) && !isready(c) && throw(closed_exception())
while !isready(c)
wait(c.cond_take)
end
Expand All @@ -125,7 +106,6 @@ isready(c::Channel) = (c.take_pos == c.put_pos ? false : true)

function wait(c::Channel)
while !isready(c)
!isopen(c) && throw(InvalidStateException("Channel is closed.", C_CLOSED))
wait(c.cond_take)
end
nothing
Expand Down
47 changes: 36 additions & 11 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ type ChannelRef{T} <: AbstractRemoteRef{T}
ChannelRef(w, wh, id) = new(w, wh, id)
end

function open_channel(; pid::Int=myid(), T::Type=Any, sz::Int=1)
function open_channel(; pid::Int=myid(), T::Type=Any, sz::Int=typemax(Int))
ref_id = next_ref_id()
remotecall_fetch(pid, (T, sz, whence, ref_id)->create_and_register_channel(T, sz, whence, ref_id), T, sz, myid(), ref_id)
ChannelRef{T}(pid, myid(), ref_id)
Expand Down Expand Up @@ -586,7 +586,7 @@ type RemoteValue{T}
end

RemoteValue(T, sz) = RemoteValue{T}(Channel(T, sz))
RemoteValue() = RemoteValue{Any}(Channel(Any))
RemoteValue() = RemoteValue{Any}(Channel(Any,1))

wait(rv::RemoteValue) = wait(rv.channel)
take!(rv::RemoteValue) = take!(rv.channel)
Expand All @@ -596,19 +596,21 @@ put!(rv::RemoteValue, val::ANY) = put!(rv.channel, val)

## core messages: do, call, fetch, wait, ref, put! ##

function run_work_thunk(thunk)
function run_work_thunk(thunk, print_error)
local result
try
result = thunk()
catch err
print(STDERR, "exception on ", myid(), ": ")
display_error(err,catch_backtrace())
if print_error
print(STDERR, "exception on ", myid(), ": ")
display_error(err,catch_backtrace())
end
result = err
end
result
end
function run_work_thunk(c::Channel, thunk)
put!(c, run_work_thunk(thunk))
put!(c, run_work_thunk(thunk, false))
nothing
end

Expand Down Expand Up @@ -671,7 +673,7 @@ remotecall(id::Integer, f, args...) = remotecall(worker_from_id(id), f, args...)

# faster version of fetch(remotecall(...))
function remotecall_fetch(w::LocalProcess, f, args...)
run_work_thunk(local_remotecall_thunk(f,args))
run_work_thunk(local_remotecall_thunk(f,args), true)
end

function remotecall_fetch(w::Worker, f, args...)
Expand Down Expand Up @@ -737,7 +739,7 @@ end
function get_ref(rid)
rv = get(PGRP.refs, rid, false)
if rv == false
throw(InvalidStateException("Channel does not exist. Closed?"))
throw(InvalidStateException("Channel does not exist. Closed?", :closed))
end
rv
end
Expand Down Expand Up @@ -898,7 +900,7 @@ function handle_msg(::Type{Val{:call_fetch}}, r_stream, w_stream)
f = deserialize(r_stream)
args = deserialize(r_stream)
@schedule begin
v = run_work_thunk(()->f(args...))
v = run_work_thunk(()->f(args...), false)
deliver_result(w_stream, :call_fetch, id, v)
v
end
Expand All @@ -919,7 +921,7 @@ function handle_msg(::Type{Val{:do}}, r_stream, w_stream)
f = deserialize(r_stream)
args = deserialize(r_stream)
@schedule begin
run_work_thunk(()->f(args...))
run_work_thunk(()->f(args...), true)
end
end

Expand Down Expand Up @@ -1591,7 +1593,7 @@ end
function timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)
pollint > 0 || throw(ArgumentError("cannot set pollint to $pollint seconds"))
start = time()
done = Channel()
done = Channel(1)
timercb(aw) = begin
try
if testcb()
Expand Down Expand Up @@ -1687,3 +1689,26 @@ function getindex(r::AbstractRemoteRef, args...)
end
return remotecall_fetch(r.where, getindex, r, args...)
end

for objtype in [:Channel, :ChannelRef]
# Since other tasks can pop values while the iterator is
# running, we pre-fetch data in the `done` call itself

eval(quote
start{T}(c::($objtype){T}) = Ref{Nullable{T}}(Nullable{T}())
function done(c::($objtype), state::Ref)
try
# we are waiting either for more data or channel to be closed
state.x = take!(c)
return false
catch e
if isa(e, InvalidStateException) && e.state==:closed
return true
else
rethrow(e)
end
end
end
next{T}(c::($objtype){T}, state) = (get(state.x), Ref{Nullable{T}}(Nullable{T}()))
end)
end
4 changes: 2 additions & 2 deletions base/sharedarray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ function SharedArray(T::Type, dims::NTuple; init=false, pids=Int[])
end

func_mapshmem = () -> begin
c = open_channel()
c = open_channel(sz=1)
put!(c, shm_mmap_array(T, dims, shm_seg_name, JL_O_RDWR))
c
end
Expand Down Expand Up @@ -140,7 +140,7 @@ function reshape{T,N}(a::SharedArray{T}, dims::NTuple{N,Int})
refs = Array(ChannelRef, length(a.pids))
@sync begin
for (i, p) in enumerate(a.pids)
refs[i] = remotecall_fetch(p, (r,d)->(c=open_channel(); put!(c, reshape(fetch(r),d)); c), a.refs[i], dims)
refs[i] = remotecall_fetch(p, (r,d)->(c=open_channel(sz=1); put!(c, reshape(fetch(r),d)); c), a.refs[i], dims)
end
end

Expand Down
2 changes: 1 addition & 1 deletion doc/manual/parallel-computing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ case the loop runs till the channel is open. The loop variable takes on
all values added to the channel:

Channels can also be used as a means of inter-process communication. To do this
they need to be created via ``open_channel(; pid::Int=myid(), T::Type=Any, sz::Int=1)``
they need to be created via ``open_channel(; pid::Int=myid(), T::Type=Any, sz::Int=typemax(Int))``
which returns a ``ChannelRef``.

Open remote channels need to be released explicitly via a ``close`` on
Expand Down
6 changes: 3 additions & 3 deletions doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ Tasks

Variants:

Channel() returns a ``Channel{Any}`` of size 1.
Channel(T::Type) returns a ``Channel{T}`` of size 1.
Channel() returns a ``Channel{Any}`` of size typemax(Int).
Channel(T::Type) returns a ``Channel{T}`` of size typemax(Int).
Channel(sz::Int) returns a ``Channel{Any}`` of size ``sz``.

.. function:: ``open_channel(; pid::Int=myid(), T::Type=Any, sz::Int=1)``
.. function:: ``open_channel(; pid::Int=myid(), T::Type=Any, sz::Int=typemax(Int))``

Returns a ``ChannelRef`` which is a reference to a channel on process ``pid``,
capable of holding a maximum number ``sz`` objects of type ``T``.
Expand Down
4 changes: 2 additions & 2 deletions test/file.jl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ end

function test_timeout(tval)
tic()
channel = Channel()
channel = Channel(1)
@async test_file_poll(channel,tval)
tr = take!(channel)
t_elapsed = toq()
Expand All @@ -125,7 +125,7 @@ end

function test_touch(slval)
tval = slval*1.1
channel = Channel()
channel = Channel(1)
@async test_file_poll(channel, tval)
sleep(tval/10) # ~ one poll period
f = open(file,"a")
Expand Down
Loading

0 comments on commit 5caf298

Please sign in to comment.