diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index e1d4bae511d91..288e55197be0a 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -245,6 +245,11 @@ function redirect_worker_output(ident, stream) end end +struct LaunchWorkerError <: Exception + msg::String +end + +Base.showerror(io::IO, e::LaunchWorkerError) = print(io, e.msg) # The default TCP transport relies on the worker listening on a free # port available and printing its bind address and port. @@ -276,7 +281,7 @@ function read_worker_host_port(io::IO) conninfo = fetch(readtask) if isempty(conninfo) && !isopen(io) - error("Unable to read host:port string from worker. Launch command exited with error?") + throw(LaunchWorkerError("Unable to read host:port string from worker. Launch command exited with error?")) end ntries -= 1 @@ -290,13 +295,13 @@ function read_worker_host_port(io::IO) end close(io) if ntries > 0 - error("Timed out waiting to read host:port string from worker.") + throw(LaunchWorkerError("Timed out waiting to read host:port string from worker.")) else - error("Unexpected output from worker launch command. Host:port string not found.") + throw(LaunchWorkerError("Unexpected output from worker launch command. Host:port string not found.")) end finally for line in leader - println("\tFrom failed worker startup:\t", line) + println("\tFrom worker startup:\t", line) end end end @@ -358,6 +363,34 @@ the package `ClusterManagers.jl`. The number of seconds a newly launched worker waits for connection establishment from the master can be specified via variable `JULIA_WORKER_TIMEOUT` in the worker process's environment. Relevant only when using TCP/IP as transport. + +To launch workers without blocking the REPL, or the containing function +if launching workers programmatically, execute `addprocs` in its own task. + +# Examples + +``` +# On busy clusters, call `addprocs` asynchronously +t = @async addprocs(...) +``` + +``` +# Utilize workers as and when they come online +if nprocs() > 1 # Ensure at least one new worker is available + .... # perform distributed execution +end +``` + +``` +# Retrieve newly launched worker IDs, or any error messages +if istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't block + if nworkers() == N + new_pids = fetch(t) + else + fetch(t) + end + end +``` """ function addprocs(manager::ClusterManager; kwargs...) init_multi() @@ -503,9 +536,13 @@ function create_worker(manager, wconfig) local r_s, w_s try (r_s, w_s) = connect(manager, w.id, wconfig) - catch e - deregister_worker(w.id) - rethrow(e) + catch ex + try + deregister_worker(w.id) + kill(manager, w.id, wconfig) + finally + rethrow(ex) + end end w = Worker(w.id, r_s, w_s, manager; config=wconfig) diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index 383a6783a311d..1248fa840fc87 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -1129,7 +1129,7 @@ for (addp_testf, expected_errstr, env) in testruns old_stdout = stdout stdout_out, stdout_in = redirect_stdout() stdout_txt = @async filter!(readlines(stdout_out)) do s - return !startswith(s, "\tFrom failed worker startup:\t") + return !startswith(s, "\tFrom worker startup:\t") end try withenv(env...) do