Skip to content

Commit

Permalink
configure pmap via keyword args
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Apr 28, 2016
1 parent 2e4513b commit 8a04e81
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 127 deletions.
47 changes: 33 additions & 14 deletions base/asyncmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,25 @@ Note: `for task in AsyncCollector(f, results, c...) end` is equivalent to
"""
type AsyncCollector
f
on_error
results
enumerator::Enumerate
ntasks::Int
end

function AsyncCollector(f, results, c...; ntasks=0)
function AsyncCollector(f, results, c...; ntasks=0, on_error=nothing)
if ntasks == 0
ntasks = 100
end
AsyncCollector(f, results, enumerate(zip(c...)), ntasks)
AsyncCollector(f, on_error, results, enumerate(zip(c...)), ntasks)
end


type AsyncCollectorState
enum_state
active_count::Int
task_done::Condition
done::Bool
in_error::Bool
end


Expand All @@ -49,12 +50,12 @@ wait(state::AsyncCollectorState) = wait(state.task_done)
# Open a @sync block and initialise iterator state.
function start(itr::AsyncCollector)
sync_begin()
AsyncCollectorState(start(itr.enumerator), 0, Condition(), false)
AsyncCollectorState(start(itr.enumerator), 0, Condition(), false, false)
end

# Close @sync block when iterator is done.
function done(itr::AsyncCollector, state::AsyncCollectorState)
if !state.done && done(itr.enumerator, state.enum_state)
if (!state.done && done(itr.enumerator, state.enum_state)) || state.in_error
state.done = true
sync_end()
end
Expand All @@ -72,14 +73,32 @@ function next(itr::AsyncCollector, state::AsyncCollectorState)

# Execute function call and save result asynchronously
@async begin
itr.results[i] = itr.f(args...)
state.active_count -= 1
notify(state.task_done, nothing)
try
itr.results[i] = itr.f(args...)
catch e
try
if isa(itr.on_error, Function)
itr.results[i] = itr.on_error(e)
else
rethrow(e)
end
catch e2
state.in_error = true
notify(state.task_done, e2; error=true)

# The "notify" above raises an exception if "next" is waiting for tasks to finish.
# If the calling task is waiting on sync_end(), the rethrow() below will be captured
# by it.
rethrow(e2)
end
finally
state.active_count -= 1
notify(state.task_done, nothing)
end
end

# Count number of concurrent tasks
state.active_count += 1

return (nothing, state)
end

Expand All @@ -97,8 +116,8 @@ type AsyncGenerator
collector::AsyncCollector
end

function AsyncGenerator(f, c...; ntasks=0)
AsyncGenerator(AsyncCollector(f, Dict{Int,Any}(), c...; ntasks=ntasks))
function AsyncGenerator(f, c...; ntasks=0, on_error=nothing)
AsyncGenerator(AsyncCollector(f, Dict{Int,Any}(), c...; ntasks=ntasks, on_error=on_error))
end


Expand Down Expand Up @@ -153,20 +172,20 @@ Transform collection `c` by applying `@async f` to each element.
For multiple collection arguments, apply f elementwise.
"""
asyncmap(f, c...) = collect(AsyncGenerator(f, c...))
asyncmap(f, c...; on_error=nothing) = collect(AsyncGenerator(f, c...; on_error=on_error))


"""
asyncmap!(f, c)
In-place version of `asyncmap()`.
"""
asyncmap!(f, c) = (for x in AsyncCollector(f, c, c) end; c)
asyncmap!(f, c; on_error=nothing) = (for x in AsyncCollector(f, c, c; on_error=on_error) end; c)


"""
asyncmap!(f, results, c...)
Like `asyncmap()`, but stores output in `results` rather returning a collection.
"""
asyncmap!(f, r, c1, c...) = (for x in AsyncCollector(f, r, c1, c...) end; r)
asyncmap!(f, r, c1, c...; on_error=nothing) = (for x in AsyncCollector(f, r, c1, c...; on_error=on_error) end; r)
27 changes: 18 additions & 9 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -995,29 +995,38 @@ export call
# and added to pmap.jl
# pmap(f, c...) = pmap(default_worker_pool(), f, c...)

function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing)
function pmap(f, c...; kwargs...)
kwdict = merge(DEFAULT_PMAP_ARGS, AnyDict(kwargs))
validate_pmap_kwargs(kwdict, append!([:err_retry, :pids, :err_stop], PMAP_KW_NAMES))

err_retry = get(kwdict, :err_retry, nothing)
err_stop = get(kwdict, :err_stop, nothing)
pids = get(kwdict, :pids, nothing)

if err_retry != nothing
depwarn("err_retry is deprecated, use pmap(retry(f), c...).", :pmap)
if err_retry == true
f = retry(f)
end
end

if err_stop != nothing
depwarn("err_stop is deprecated, use pmap(@catch(f), c...).", :pmap)
if err_stop == false
f = @catch(f)
end
end

if pids == nothing
p = default_worker_pool()
else
depwarn("pids is deprecated, use pmap(::WorkerPool, f, c...).", :pmap)
p = WorkerPool(pids)
end

return pmap(p, f, c...)
if err_stop != nothing
depwarn("err_stop is deprecated, use pmap(f, c...; on_error = error_handling_func).", :pmap)
if err_stop == false
kwdict[:on_error] = e->e
end
end

pmap(p, f, c...; distributed=kwdict[:distributed],
batch_size=kwdict[:batch_size],
on_error=kwdict[:on_error])
end


Expand Down
23 changes: 0 additions & 23 deletions base/error.jl
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,3 @@ end

retry(f::Function, t::Type; kw...) = retry(f, e->isa(e, t); kw...)


"""
@catch(f) -> Function
Returns a lambda that executes `f` and returns either the result of `f` or
an `Exception` thrown by `f`.
**Examples**
```julia
julia> r = @catch(length)([1,2,3])
3
julia> r = @catch(length)()
MethodError(length,())
julia> typeof(r)
MethodError
```
"""
catchf(f) = (args...) -> try f(args...) catch ex; ex end
macro catch(f)
esc(:(Base.catchf($f)))
end
73 changes: 58 additions & 15 deletions base/pmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,81 @@ Note that `f` must be made available to all worker processes; see
and Loading Packages <man-parallel-computing-code-availability>`)
for details.
"""
function pgenerate(p::WorkerPool, f, c)
if length(p) == 0
return AsyncGenerator(f, c)
function pgenerate(p::WorkerPool, f, c; config=DEFAULT_PMAP_ARGS)
batch_size = config[:batch_size]
on_error = config[:on_error]
distributed = config[:distributed]

if (distributed == false) ||
(length(p) == 0) ||
(length(p) == 1 && fetch(p.channel) == myid())

return AsyncGenerator(f, c; on_error=on_error)
end

if batch_size == :auto
batches = batchsplit(c, min_batch_count = length(p) * 3)
else
batches = batchsplit(c, max_batch_size = batch_size)
end
batches = batchsplit(c, min_batch_count = length(p) * 3)
return flatten(AsyncGenerator(remote(p, b -> asyncmap(f, b)), batches))
return flatten(AsyncGenerator(remote(p, b -> asyncmap(f, b; on_error=on_error)), batches; on_error=on_error))
end

pgenerate(p::WorkerPool, f, c1, c...) = pgenerate(p, a->f(a...), zip(c1, c...))
pgenerate(p::WorkerPool, f, c1, c...; kwargs...) = pgenerate(p, a->f(a...), zip(c1, c...); kwargs...)

pgenerate(f, c) = pgenerate(default_worker_pool(), f, c...)
pgenerate(f, c1, c...) = pgenerate(a->f(a...), zip(c1, c...))
pgenerate(f, c; kwargs...) = pgenerate(default_worker_pool(), f, c...; kwargs...)
pgenerate(f, c1, c...; kwargs...) = pgenerate(a->f(a...), zip(c1, c...); kwargs...)


"""
pmap([::WorkerPool], f, c...) -> collection
pmap([::WorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing) -> collection
Transform collection `c` by applying `f` to each element using available
workers and tasks.
For multiple collection arguments, apply f elementwise.
Note that `err_retry=true` and `err_stop=false` are deprecated,
use `pmap(retry(f), c)` or `pmap(@catch(f), c)` instead
(or to retry on a different worker, use `asyncmap(retry(remote(f)), c)`).
Note that `f` must be made available to all worker processes; see
[Code Availability and Loading Packages](:ref:`Code Availability
and Loading Packages <man-parallel-computing-code-availability>`)
for details.
If a worker pool is not specified, all available workers, i.e., the default worker pool
is used.
By default, `pmap` distributes the computation over all specified workers. To use only the
local process and distribute over tasks, specifiy `distributed=false`
`pmap` can also use a mix of processes and tasks via the `batch_size` argument. For batch sizes
greater than 1, the collection is split into multiple batches, which are distributed across
workers. Each such batch is processed in parallel via tasks in each worker. `batch_size=:auto`
will automtically calculate a batch size depending on the length of the collection and number
of workers available.
Any error stops pmap from processing the remainder of the collection. To override this behavior
you can specify an error handling function via argument `on_error` which takes in a single argument, i.e.,
the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value
which is then returned inline with the results to the caller.
"""
pmap(p::WorkerPool, f, c...) = collect(pgenerate(p, f, c...))
function pmap(p::WorkerPool, f, c...; kwargs...)
kwdict = merge(DEFAULT_PMAP_ARGS, AnyDict(kwargs))
validate_pmap_kwargs(kwdict, PMAP_KW_NAMES)

collect(pgenerate(p, f, c...; config=kwdict))
end


const DEFAULT_PMAP_ARGS = AnyDict(
:distributed => true,
:batch_size => 1,
:on_error => nothing)

const PMAP_KW_NAMES = [:distributed, :batch_size, :on_error]
function validate_pmap_kwargs(kwdict, kwnames)
unsupported = filter(x -> !(x in kwnames), collect(keys(kwdict)))
length(unsupported) > 1 && throw(ArgumentError("keyword arguments $unsupported are not supported."))
nothing
end


"""
Expand All @@ -72,7 +115,7 @@ function batchsplit(c; min_batch_count=1, max_batch_size=100)
# If there are not enough batches, use a smaller batch size
if length(head) < min_batch_count
batch_size = max(1, div(sum(length, head), min_batch_count))
return partition(flatten(head), batch_size)
return partition(collect(flatten(head)), batch_size)
end

return flatten((head, tail))
Expand Down
13 changes: 12 additions & 1 deletion base/workerpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,18 @@ length(pool::WorkerPool) = pool.count
isready(pool::WorkerPool) = isready(pool.channel)

function remotecall_pool(rc_f, f, pool::WorkerPool, args...; kwargs...)
worker = take!(pool.channel)
# Find an active worker
worker = 0
while true
pool.count == 0 && throw(ErrorException("No active worker available in pool"))
worker = take!(pool.channel)
if worker in procs()
break
else
pool.count = pool.count - 1
end
end

try
rc_f(f, worker, args...; kwargs...)
finally
Expand Down
19 changes: 6 additions & 13 deletions test/error.jl
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
# This file is a part of Julia. License is MIT: http://julialang.org/license


@test map(typeof, map(@catch(i->[1,2,3][i]), 1:6)) ==
[Int, Int, Int, BoundsError, BoundsError, BoundsError]

@test typeof(@catch(open)("/no/file/with/this/name")) == SystemError


let
function foo_error(c, n)
c[1] += 1
Expand All @@ -33,37 +26,37 @@ let

# 3 failed attempts, so exception is raised
c = [0]
ex = @catch(retry(foo_error))(c,3)
ex = try retry(foo_error)(c,3); catch e; e; end
@test ex.msg == "foo"
@test c[1] == 3

c = [0]
ex = @catch(retry(foo_error, ErrorException))(c,3)
ex = try (retry(foo_error, ErrorException))(c,3); catch e; e; end
@test typeof(ex) == ErrorException
@test ex.msg == "foo"
@test c[1] == 3

c = [0]
ex = @catch(retry(foo_error, e->e.msg == "foo"))(c,3)
ex = try (retry(foo_error, e->e.msg == "foo"))(c,3) catch e; e; end
@test typeof(ex) == ErrorException
@test ex.msg == "foo"
@test c[1] == 3

# No retry if condition does not match
c = [0]
ex = @catch(retry(foo_error, e->e.msg == "bar"))(c,3)
ex = try (retry(foo_error, e->e.msg == "bar"))(c,3) catch e; e; end
@test typeof(ex) == ErrorException
@test ex.msg == "foo"
@test c[1] == 1

c = [0]
ex = @catch(retry(foo_error, e->e.http_status_code == "503"))(c,3)
ex = try (retry(foo_error, e->e.http_status_code == "503"))(c,3) catch e; e; end
@test typeof(ex) == ErrorException
@test ex.msg == "foo"
@test c[1] == 1

c = [0]
ex = @catch(retry(foo_error, SystemError))(c,3)
ex = try (retry(foo_error, SystemError))(c,3) catch e; e; end
@test typeof(ex) == ErrorException
@test ex.msg == "foo"
@test c[1] == 1
Expand Down
Loading

0 comments on commit 8a04e81

Please sign in to comment.