Skip to content

Commit

Permalink
cluster manager fixes (#30172)
Browse files Browse the repository at this point in the history
* kill workers which don't launch properly

* don't emit spurious error messages

* document how to asynchronously launch workers

(cherry picked from commit 121e814)
  • Loading branch information
bjarthur authored and KristofferC committed Dec 30, 2018
1 parent 2d1c925 commit aede024
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
51 changes: 44 additions & 7 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ function redirect_worker_output(ident, stream)
end
end

struct LaunchWorkerError <: Exception
msg::String
end

Base.showerror(io::IO, e::LaunchWorkerError) = print(io, e.msg)

# The default TCP transport relies on the worker listening on a free
# port available and printing its bind address and port.
Expand Down Expand Up @@ -276,7 +281,7 @@ function read_worker_host_port(io::IO)

conninfo = fetch(readtask)
if isempty(conninfo) && !isopen(io)
error("Unable to read host:port string from worker. Launch command exited with error?")
throw(LaunchWorkerError("Unable to read host:port string from worker. Launch command exited with error?"))
end

ntries -= 1
Expand All @@ -290,13 +295,13 @@ function read_worker_host_port(io::IO)
end
close(io)
if ntries > 0
error("Timed out waiting to read host:port string from worker.")
throw(LaunchWorkerError("Timed out waiting to read host:port string from worker."))
else
error("Unexpected output from worker launch command. Host:port string not found.")
throw(LaunchWorkerError("Unexpected output from worker launch command. Host:port string not found."))
end
finally
for line in leader
println("\tFrom failed worker startup:\t", line)
println("\tFrom worker startup:\t", line)
end
end
end
Expand Down Expand Up @@ -358,6 +363,34 @@ the package `ClusterManagers.jl`.
The number of seconds a newly launched worker waits for connection establishment from the
master can be specified via variable `JULIA_WORKER_TIMEOUT` in the worker process's
environment. Relevant only when using TCP/IP as transport.
To launch workers without blocking the REPL, or the containing function
if launching workers programmatically, execute `addprocs` in its own task.
# Examples
```
# On busy clusters, call `addprocs` asynchronously
t = @async addprocs(...)
```
```
# Utilize workers as and when they come online
if nprocs() > 1 # Ensure at least one new worker is available
.... # perform distributed execution
end
```
```
# Retrieve newly launched worker IDs, or any error messages
if istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't block
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
```
"""
function addprocs(manager::ClusterManager; kwargs...)
init_multi()
Expand Down Expand Up @@ -503,9 +536,13 @@ function create_worker(manager, wconfig)
local r_s, w_s
try
(r_s, w_s) = connect(manager, w.id, wconfig)
catch e
deregister_worker(w.id)
rethrow(e)
catch ex
try
deregister_worker(w.id)
kill(manager, w.id, wconfig)
finally
rethrow(ex)
end
end

w = Worker(w.id, r_s, w_s, manager; config=wconfig)
Expand Down
2 changes: 1 addition & 1 deletion stdlib/Distributed/test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ for (addp_testf, expected_errstr, env) in testruns
old_stdout = stdout
stdout_out, stdout_in = redirect_stdout()
stdout_txt = @async filter!(readlines(stdout_out)) do s
return !startswith(s, "\tFrom failed worker startup:\t")
return !startswith(s, "\tFrom worker startup:\t")
end
try
withenv(env...) do
Expand Down

0 comments on commit aede024

Please sign in to comment.